This is an automated email from the ASF dual-hosted git repository.

tledkov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new ec608b0  IGNITE-15267 Check statistics obsolescence on server nodes 
only (#9318)
ec608b0 is described below

commit ec608b0b223671ed2517b5c05836ae52e6830b2a
Author: Berkof <sasha_bel...@mail.ru>
AuthorDate: Mon Oct 4 14:44:26 2021 +0700

    IGNITE-15267 Check statistics obsolescence on server nodes only (#9318)
---
 .../stat/IgniteStatisticsConfigurationManager.java | 142 +++++++++++++--------
 .../query/stat/IgniteStatisticsManagerImpl.java    |  87 +++++++------
 .../processors/query/stat/StatisticsGatherer.java  |  13 +-
 .../query/stat/StatisticsConfigurationTest.java    |  49 ++++++-
 4 files changed, 196 insertions(+), 95 deletions(-)

diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsConfigurationManager.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsConfigurationManager.java
index 12f9495..c81e146 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsConfigurationManager.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsConfigurationManager.java
@@ -29,7 +29,6 @@ import java.util.Set;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
-
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterState;
@@ -113,34 +112,39 @@ public class IgniteStatisticsConfigurationManager {
     /** */
     private final GridCachePartitionExchangeManager exchange;
 
+    /** Is server node flag. */
+    private final boolean isServerNode;
+
     /** */
     private final DistributedMetastorageLifecycleListener distrMetaStoreLsnr = 
new DistributedMetastorageLifecycleListener() {
         @Override public void onReadyForRead(ReadableDistributedMetaStorage 
metastorage) {
             distrMetaStorage = (DistributedMetaStorage)metastorage;
 
-            distrMetaStorage.listen(
-                (metaKey) -> metaKey.startsWith(STAT_OBJ_PREFIX),
-                (k, oldV, newV) -> {
-                    // Skip invoke on start node (see 
'ReadableDistributedMetaStorage#listen' the second case)
-                    // The update statistics on start node is handled by 
'scanAndCheckLocalStatistic' method
-                    // called on exchange done.
-                    if (!started)
-                        return;
-
-                    mgmtPool.submit(() -> {
-                        try {
-                            onChangeStatisticConfiguration(
-                                (StatisticsObjectConfiguration)oldV,
-                                (StatisticsObjectConfiguration)newV
-                            );
-                        }
-                        catch (Throwable e) {
-                            log.warning("Unexpected exception on change 
statistic configuration [old="
-                                + oldV + ", new=" + newV + ']', e);
-                        }
-                    });
-                }
-            );
+            if (isServerNode) {
+                distrMetaStorage.listen(
+                    (metaKey) -> metaKey.startsWith(STAT_OBJ_PREFIX),
+                    (k, oldV, newV) -> {
+                        // Skip invoke on start node (see 
'ReadableDistributedMetaStorage#listen' the second case)
+                        // The update statistics on start node is handled by 
'scanAndCheckLocalStatistic' method
+                        // called on exchange done.
+                        if (!started)
+                            return;
+
+                        mgmtPool.submit(() -> {
+                            try {
+                                onChangeStatisticConfiguration(
+                                    (StatisticsObjectConfiguration) oldV,
+                                    (StatisticsObjectConfiguration) newV
+                                );
+                            }
+                            catch (Throwable e) {
+                                log.warning("Unexpected exception on change 
statistic configuration [old="
+                                    + oldV + ", new=" + newV + ']', e);
+                            }
+                        });
+                    }
+                );
+            }
         }
     };
 
@@ -218,7 +222,20 @@ public class IgniteStatisticsConfigurationManager {
         }
     };
 
-    /** */
+    /**
+     * Constructor.
+     *
+     * @param schemaMgr Schema manager.
+     * @param subscriptionProcessor Grid internal subsctiption processor.
+     * @param sysViewMgr Grid system view manager.
+     * @param cluster Grid cluster state processor.
+     * @param exchange Grid cache partition exchange manager.
+     * @param repo Ignite statistics repository.
+     * @param gatherer Statistics gatherer (or {@code null} for client nodes).
+     * @param mgmtPool Statistics management thread pool (or {@code null} for 
client nodes).
+     * @param logSupplier Ignite logger supplier.
+     * @param isServerNode server node flag.
+     */
     public IgniteStatisticsConfigurationManager(
         SchemaManager schemaMgr,
         GridInternalSubscriptionProcessor subscriptionProcessor,
@@ -228,7 +245,8 @@ public class IgniteStatisticsConfigurationManager {
         IgniteStatisticsRepository repo,
         StatisticsGatherer gatherer,
         IgniteThreadPoolExecutor mgmtPool,
-        Function<Class<?>, IgniteLogger> logSupplier
+        Function<Class<?>, IgniteLogger> logSupplier,
+        boolean isServerNode
     ) {
         this.schemaMgr = schemaMgr;
         log = logSupplier.apply(IgniteStatisticsConfigurationManager.class);
@@ -238,6 +256,7 @@ public class IgniteStatisticsConfigurationManager {
         this.cluster = cluster;
         this.subscriptionProcessor = subscriptionProcessor;
         this.exchange = exchange;
+        this.isServerNode = isServerNode;
 
         
this.subscriptionProcessor.registerDistributedMetastorageListener(distrMetaStoreLsnr);
 
@@ -308,15 +327,17 @@ public class IgniteStatisticsConfigurationManager {
         if (log.isTraceEnabled())
             log.trace("Statistics configuration manager starting...");
 
-        exchange.registerExchangeAwareComponent(exchAwareLsnr);
+        if (isServerNode) {
+            exchange.registerExchangeAwareComponent(exchAwareLsnr);
 
-        schemaMgr.registerDropColumnsListener(dropColsLsnr);
-        schemaMgr.registerDropTableListener(dropTblLsnr);
+            schemaMgr.registerDropColumnsListener(dropColsLsnr);
+            schemaMgr.registerDropTableListener(dropTblLsnr);
+        }
 
         if (log.isDebugEnabled())
             log.debug("Statistics configuration manager started.");
 
-        if (distrMetaStorage != null)
+        if (distrMetaStorage != null && isServerNode)
             scanAndCheckLocalStatistics(exchange.readyAffinityVersion());
     }
 
@@ -327,10 +348,12 @@ public class IgniteStatisticsConfigurationManager {
         if (log.isTraceEnabled())
             log.trace("Statistics configuration manager stopping...");
 
-        exchange.unregisterExchangeAwareComponent(exchAwareLsnr);
+        if (isServerNode) {
+            exchange.unregisterExchangeAwareComponent(exchAwareLsnr);
 
-        schemaMgr.unregisterDropColumnsListener(dropColsLsnr);
-        schemaMgr.unregisterDropTableListener(dropTblLsnr);
+            schemaMgr.unregisterDropColumnsListener(dropColsLsnr);
+            schemaMgr.unregisterDropTableListener(dropTblLsnr);
+        }
 
         if (log.isDebugEnabled())
             log.debug("Statistics configuration manager stopped.");
@@ -566,9 +589,9 @@ public class IgniteStatisticsConfigurationManager {
 
             AffinityTopologyVersion topVer0 = 
cctx.affinity().affinityReadyFuture(topVer).get();
 
-            final Set<Integer> parts = 
cctx.affinity().primaryPartitions(cctx.localNodeId(), topVer0);
+            final Set<Integer> primParts = 
cctx.affinity().primaryPartitions(cctx.localNodeId(), topVer0);
 
-            if (F.isEmpty(parts)) {
+            if (F.isEmpty(primParts)) {
                 // There is no data on the node for specified cache.
                 // Remove oll data
                 dropColumnsOnLocalStatistics(cfg, cfg.columns().keySet());
@@ -580,27 +603,32 @@ public class IgniteStatisticsConfigurationManager {
                 cctx.affinity().backupPartitions(cctx.localNodeId(), topVer0)
             );
 
-            partsOwn.addAll(parts);
+            partsOwn.addAll(primParts);
 
             if (log.isDebugEnabled())
-                log.debug("Check local statistics [key=" + cfg + ", parts=" + 
parts + ']');
+                log.debug("Check local statistics [key=" + cfg + ", parts=" + 
primParts + ']');
 
-            Collection<ObjectPartitionStatisticsImpl> partStats = 
repo.getLocalPartitionsStatistics(cfg.key());
+            Collection<ObjectPartitionStatisticsImpl> collectedPartStats = 
repo.getLocalPartitionsStatistics(cfg.key());
 
             Set<Integer> partsToRmv = new HashSet<>();
-            Set<Integer> partsToCollect = new HashSet<>(parts);
+            Set<Integer> partsToCollect = new HashSet<>(primParts);
+
+            /** All necessary partition already collected so each possibly 
column can be calculated from it. */
+            boolean allColumnsChecked = 
collectedPartStats.stream().map(ObjectPartitionStatisticsImpl::partId)
+                .collect(Collectors.toList()).containsAll(partsToCollect);
+
             Map<String, StatisticsColumnConfiguration> colsToCollect = new 
HashMap<>();
             Set<String> colsToRmv = new HashSet<>();
 
-            if (!F.isEmpty(partStats)) {
-                for (ObjectPartitionStatisticsImpl pstat : partStats) {
-                    if (!partsOwn.contains(pstat.partId()))
-                        partsToRmv.add(pstat.partId());
+            if (!F.isEmpty(collectedPartStats)) {
+                for (ObjectPartitionStatisticsImpl collPStat : 
collectedPartStats) {
+                    if (!partsOwn.contains(collPStat.partId()))
+                        partsToRmv.add(collPStat.partId());
 
-                    boolean partExists = true;
+                    boolean partFullyCollected = true;
 
                     for (StatisticsColumnConfiguration colCfg : 
cfg.columnsAll().values()) {
-                        ColumnStatistics colStat = 
pstat.columnStatistics(colCfg.name());
+                        ColumnStatistics colStat = 
collPStat.columnStatistics(colCfg.name());
 
                         if (colCfg.tombstone()) {
                             if (colStat != null)
@@ -610,15 +638,15 @@ public class IgniteStatisticsConfigurationManager {
                             if (colStat == null || colStat.version() < 
colCfg.version()) {
                                 colsToCollect.put(colCfg.name(), colCfg);
 
-                                partsToCollect.add(pstat.partId());
+                                partsToCollect.add(collPStat.partId());
 
-                                partExists = false;
+                                partFullyCollected = false;
                             }
                         }
                     }
 
-                    if (partExists)
-                        partsToCollect.remove(pstat.partId());
+                    if (partFullyCollected)
+                        partsToCollect.remove(collPStat.partId());
                 }
             }
 
@@ -638,15 +666,23 @@ public class IgniteStatisticsConfigurationManager {
             if (!F.isEmpty(colsToRmv))
                 dropColumnsOnLocalStatistics(cfg, colsToRmv);
 
-            if (!F.isEmpty(partsToCollect))
-                gatherLocalStatistics(cfg, tbl, parts, partsToCollect, 
colsToCollect);
+            // No columns to collect statistics by.
+            if (cfg.columns().isEmpty())
+                return Collections.emptySet();
+
+            if (!F.isEmpty(partsToCollect)) {
+                if (!allColumnsChecked)
+                    colsToCollect = cfg.columns();
+
+                gatherLocalStatistics(cfg, tbl, primParts, partsToCollect, 
colsToCollect);
+            }
             else {
                 // All local statistics by partition are available.
                 // Only refresh aggregated local statistics.
-                gatherer.aggregateStatisticsAsync(cfg.key(), () -> 
aggregateLocalGathering(cfg.key(), parts));
+                gatherer.aggregateStatisticsAsync(cfg.key(), () -> 
aggregateLocalGathering(cfg.key(), primParts));
             }
 
-            return parts;
+            return primParts;
         }
         catch (Throwable ex) {
             log.error("Unexpected error on check local statistics", ex);
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java
index 2398263..3ff3fa6 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java
@@ -101,6 +101,8 @@ public class IgniteStatisticsManagerImpl implements 
IgniteStatisticsManager {
         this.ctx = ctx;
         this.schemaMgr = schemaMgr;
 
+        boolean serverNode = !(ctx.config().isClientMode() || ctx.isDaemon());
+
         helper = new IgniteStatisticsHelper(ctx.localNodeId(), schemaMgr, 
ctx::log);
 
         log = ctx.log(IgniteStatisticsManagerImpl.class);
@@ -108,30 +110,33 @@ public class IgniteStatisticsManagerImpl implements 
IgniteStatisticsManager {
         IgniteCacheDatabaseSharedManager db = 
(GridCacheUtils.isPersistenceEnabled(ctx.config())) ?
             ctx.cache().context().database() : null;
 
-        gatherPool = new IgniteThreadPoolExecutor("stat-gather",
-            ctx.igniteInstanceName(),
-            0,
-            STATS_POOL_SIZE,
-            IgniteConfiguration.DFLT_THREAD_KEEP_ALIVE_TIME,
-            new LinkedBlockingQueue<>(),
-            GridIoPolicy.UNDEFINED,
-            ctx.uncaughtExceptionHandler()
-        );
-
-        mgmtPool = new IgniteThreadPoolExecutor("stat-mgmt",
-            ctx.igniteInstanceName(),
-            0,
-            1,
-            IgniteConfiguration.DFLT_THREAD_KEEP_ALIVE_TIME,
-            new LinkedBlockingQueue<>(),
-            GridIoPolicy.UNDEFINED,
-            ctx.uncaughtExceptionHandler()
-        );
-
-        boolean storeData = !(ctx.config().isClientMode() || ctx.isDaemon());
+        if (serverNode) {
+            gatherPool = new IgniteThreadPoolExecutor("stat-gather",
+                ctx.igniteInstanceName(),
+                0,
+                STATS_POOL_SIZE,
+                IgniteConfiguration.DFLT_THREAD_KEEP_ALIVE_TIME,
+                new LinkedBlockingQueue<>(),
+                GridIoPolicy.UNDEFINED,
+                ctx.uncaughtExceptionHandler()
+            );
+            mgmtPool = new IgniteThreadPoolExecutor("stat-mgmt",
+                ctx.igniteInstanceName(),
+                0,
+                1,
+                IgniteConfiguration.DFLT_THREAD_KEEP_ALIVE_TIME,
+                new LinkedBlockingQueue<>(),
+                GridIoPolicy.UNDEFINED,
+                ctx.uncaughtExceptionHandler()
+            );
+        }
+        else {
+            gatherPool = null;
+            mgmtPool = null;
+        }
 
         IgniteStatisticsStore store;
-        if (!storeData)
+        if (!serverNode)
             store = new IgniteStatisticsDummyStoreImpl(ctx::log);
         else if (db == null)
             store = new IgniteStatisticsInMemoryStoreImpl(ctx::log);
@@ -140,11 +145,11 @@ public class IgniteStatisticsManagerImpl implements 
IgniteStatisticsManager {
 
         statsRepos = new IgniteStatisticsRepository(store, ctx.systemView(), 
helper, ctx::log);
 
-        gatherer = new StatisticsGatherer(
+        gatherer = serverNode ? new StatisticsGatherer(
             statsRepos,
             gatherPool,
             ctx::log
-        );
+        ) : null;
 
         statCfgMgr = new IgniteStatisticsConfigurationManager(
             schemaMgr,
@@ -155,7 +160,8 @@ public class IgniteStatisticsManagerImpl implements 
IgniteStatisticsManager {
             statsRepos,
             gatherer,
             mgmtPool,
-            ctx::log
+            ctx::log,
+            serverNode
         );
 
         
ctx.internalSubscriptionProcessor().registerDistributedConfigurationListener(dispatcher
 -> {
@@ -186,20 +192,23 @@ public class IgniteStatisticsManagerImpl implements 
IgniteStatisticsManager {
         if (currState == ON || currState == NO_UPDATE)
             enableOperations();
 
-        ctx.timeout().schedule(() -> {
-            StatisticsUsageState state = usageState();
-            if (state == ON && !ctx.isStopping()) {
-                if (log.isTraceEnabled())
-                    log.trace("Processing statistics obsolescence...");
+        if (serverNode) {
+            ctx.timeout().schedule(() -> {
+                StatisticsUsageState state = usageState();
 
-                try {
-                    processObsolescence();
-                } catch (Throwable e) {
-                    log.warning("Error while processing statistics 
obsolescence", e);
+                if (state == ON && !ctx.isStopping()) {
+                    if (log.isTraceEnabled())
+                        log.trace("Processing statistics obsolescence...");
+
+                    try {
+                        processObsolescence();
+                    } catch (Throwable e) {
+                        log.warning("Error while processing statistics 
obsolescence", e);
+                    }
                 }
-            }
 
-        }, OBSOLESCENCE_INTERVAL * 1000, OBSOLESCENCE_INTERVAL * 1000);
+            }, OBSOLESCENCE_INTERVAL * 1000, OBSOLESCENCE_INTERVAL * 1000);
+        }
     }
 
     /**
@@ -207,7 +216,8 @@ public class IgniteStatisticsManagerImpl implements 
IgniteStatisticsManager {
      */
     private synchronized void enableOperations() {
         statsRepos.start();
-        gatherer.start();
+        if (gatherer != null)
+            gatherer.start();
         statCfgMgr.start();
     }
 
@@ -216,7 +226,8 @@ public class IgniteStatisticsManagerImpl implements 
IgniteStatisticsManager {
      */
     private synchronized void disableOperations() {
         statCfgMgr.stop();
-        gatherer.stop();
+        if (gatherer != null)
+            gatherer.stop();
         statsRepos.stop();
     }
 
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsGatherer.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsGatherer.java
index 4c333fc..488b71f 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsGatherer.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsGatherer.java
@@ -272,11 +272,18 @@ public class StatisticsGatherer {
         if (log.isTraceEnabled())
             log.trace(String.format("Statistics gathering stopping %d 
task...", gatheringInProgress.size()));
 
-        gatheringInProgress.values().forEach(ctx -> 
ctx.futureGather().cancel(true));
-
-        gatheringInProgress.clear();
+        cancelAllTasks();
 
         if (log.isDebugEnabled())
             log.debug("Statistics gathering stopped.");
     }
+
+    /**
+     * Cancel all currently running statistics gathering tasks.
+     */
+    public void cancelAllTasks() {
+        gatheringInProgress.values().forEach(ctx -> 
ctx.futureGather().cancel(true));
+
+        gatheringInProgress.clear();
+    }
 }
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsConfigurationTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsConfigurationTest.java
index 746fbdd..be28aab 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsConfigurationTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsConfigurationTest.java
@@ -34,11 +34,14 @@ import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import 
org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import 
org.apache.ignite.internal.processors.query.stat.config.StatisticsObjectConfiguration;
 import 
org.apache.ignite.internal.processors.query.stat.messages.StatisticsObjectData;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
 import org.jetbrains.annotations.NotNull;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -57,6 +60,13 @@ public class StatisticsConfigurationTest extends 
StatisticsAbstractTest {
     /** Columns to check.*/
     private static final String[] COLUMNS = {"A", "B", "C"};
 
+    /** Listener which catches client-side statistics store warning. */
+    private LogListener obsolescenceLsnr = LogListener
+        .matches("Unable to save statistics obsolescence info on non server 
node.").build();
+
+    /** Logger which tries to catch client-side statistics store warning. */
+    private final ListeningTestLogger obsolescenceAwareLog = new 
ListeningTestLogger(log(), obsolescenceLsnr);
+
     /** Lazy mode. */
     @Parameterized.Parameter(value = 0)
     public boolean persist;
@@ -110,7 +120,7 @@ public class StatisticsConfigurationTest extends 
StatisticsAbstractTest {
                         new DataRegionConfiguration()
                             .setPersistenceEnabled(persist)
                     )
-            );
+            ).setGridLogger(obsolescenceAwareLog);
     }
 
     /** {@inheritDoc} */
@@ -207,6 +217,43 @@ public class StatisticsConfigurationTest extends 
StatisticsAbstractTest {
     }
 
     /**
+     * Start client node and check no store related errors in log.
+     *
+     * @throws Exception In case of errors.
+     */
+    @Test
+    public void checkClientNode() throws Exception {
+        startGridAndChangeBaseline(0);
+
+        createSmallTable(null);
+
+        IgniteEx client = startClientGrid("cli");
+
+        collectStatistics(SMALL_TARGET);
+
+        sql("delete from small");
+
+        for (int i = 0; i < 1000; i++)
+            sql(String.format("INSERT INTO small(a, b, c) VALUES(%d, %d, %d)", 
i, i, i % 10));
+
+        StatisticsObjectConfiguration smallCfg = 
statisticsMgr(0).statisticConfiguration().config(SMALL_KEY);
+
+        statisticsMgr(client).refreshStatistics(SMALL_TARGET);
+
+        Thread.sleep(100);
+
+        StatisticsObjectConfiguration smallCfg2 = 
statisticsMgr(0).statisticConfiguration().config(SMALL_KEY);
+
+        assertNotSame(smallCfg.columns().get("A").version(), 
smallCfg2.columns().get("A").version());
+
+        client.cluster().state(ClusterState.INACTIVE);
+
+        client.cluster().state(ClusterState.ACTIVE);
+
+        assertFalse(obsolescenceLsnr.check(TIMEOUT));
+    }
+
+    /**
      * Check statistics on cluster after change topology.
      * 1. Create statistic for a table;
      * 2. Check statistics on all nodes of the cluster;

Reply via email to