This is an automated email from the ASF dual-hosted git repository.

timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new b27817afe2b IGNITE-21550 Expose CDC metrics via push metrics exporter 
spi (#11248)
b27817afe2b is described below

commit b27817afe2b159b0562f999be6fb5537ce129433
Author: Sergey Korotkov <serge.korot...@gmail.com>
AuthorDate: Thu Feb 29 21:51:47 2024 +0700

    IGNITE-21550 Expose CDC metrics via push metrics exporter spi (#11248)
---
 .../org/apache/ignite/internal/cdc/CdcMain.java    |  5 ++
 .../wal/reader/StandaloneGridKernalContext.java    | 24 +++++-
 .../org/apache/ignite/cdc/AbstractCdcTest.java     |  1 +
 .../ignite/cdc/CdcPushMetricsExporterTest.java     | 92 ++++++++++++++++++++++
 .../ignite/testsuites/IgnitePdsTestSuite2.java     |  2 +
 .../ignitetest/services/utils/cdc/ignite_cdc.py    |  4 +-
 .../opencensus/OpenCensusMetricExporterSpi.java    |  6 +-
 7 files changed, 128 insertions(+), 6 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java 
b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
index 2f9269b18b1..a85cd9bf858 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
@@ -78,6 +78,7 @@ import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi;
 import org.apache.ignite.spi.metric.noop.NoopMetricExporterSpi;
 import org.apache.ignite.startup.cmdline.CdcCommandLineStartup;
 
+import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
 import static org.apache.ignite.internal.IgniteKernal.NL;
 import static org.apache.ignite.internal.IgniteKernal.SITE;
 import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR;
@@ -321,6 +322,8 @@ public class CdcMain implements Runnable {
             throw new IllegalArgumentException(ERR_MSG);
         }
 
+        System.setProperty(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK, 
"true");
+
         try (CdcFileLockHolder lock = lockPds()) {
             String consIdDir = cdcDir.getName(cdcDir.getNameCount() - 
1).toString();
 
@@ -416,6 +419,8 @@ public class CdcMain implements Runnable {
 
         startAllComponents(kctx);
 
+        kctx.metric().onKernalStart(true);
+
         mreg = kctx.metric().registry("cdc");
 
         return kctx;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
index af8a24f503c..a2da615224c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
@@ -137,6 +137,9 @@ public class StandaloneGridKernalContext implements 
GridKernalContext {
     /** System view manager. */
     private final GridSystemViewManager sysViewMgr;
 
+    /** Timeout processor. */
+    private final GridTimeoutProcessor timeoutProc;
+
     /** */
     @GridToStringExclude
     private CacheObjectTransformerProcessor transProc;
@@ -201,6 +204,7 @@ public class StandaloneGridKernalContext implements 
GridKernalContext {
         rsrcProc = new GridResourceProcessor(this);
         metricMgr = new GridMetricManager(this);
         sysViewMgr = new GridSystemViewManager(this);
+        timeoutProc = new GridTimeoutProcessor(this);
         transProc = createComponent(CacheObjectTransformerProcessor.class);
 
         // Fake folder provided to perform processor startup on empty folder.
@@ -212,6 +216,7 @@ public class StandaloneGridKernalContext implements 
GridKernalContext {
         comps.add(rsrcProc);
         comps.add(cacheObjProcessor);
         comps.add(metricMgr);
+        comps.add(timeoutProc);
 
         if (marshallerMappingFileStoreDir != null) {
             
marshallerCtx.setMarshallerMappingFileStoreDir(marshallerMappingFileStoreDir);
@@ -306,7 +311,20 @@ public class StandaloneGridKernalContext implements 
GridKernalContext {
 
     /** {@inheritDoc} */
     @Override public IgniteEx grid() {
-        final IgniteEx kernal = new IgniteKernal();
+        final IgniteEx kernal = new IgniteKernal() {
+            /**
+             * Override to return the non-null context instance to make metric 
SPIs happy.<br>
+             *
+             * Say the SqlViewMetricExporterSpi one which may be automatically 
added by
+             * the {@link GridMetricManager} if indexing or query engine are 
found in classpath
+             * (which is the default behaviour).
+             *
+             * @return Kernal context.
+             */
+            @Override public GridKernalContext context() {
+                return StandaloneGridKernalContext.this;
+            }
+        };
         try {
             setField(kernal, "cfg", cfg);
             setField(kernal, "igniteInstanceName", 
cfg.getIgniteInstanceName());
@@ -319,7 +337,7 @@ public class StandaloneGridKernalContext implements 
GridKernalContext {
 
     /** */
     private void setField(IgniteEx kernal, String name, Object val) throws 
NoSuchFieldException, IllegalAccessException {
-        Field field = kernal.getClass().getDeclaredField(name);
+        Field field = kernal.getClass().getSuperclass().getDeclaredField(name);
         field.setAccessible(true);
         field.set(kernal, val);
     }
@@ -346,7 +364,7 @@ public class StandaloneGridKernalContext implements 
GridKernalContext {
 
     /** {@inheritDoc} */
     @Override public GridTimeoutProcessor timeout() {
-        return null;
+        return timeoutProc;
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java 
b/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java
index dcab0bcc441..c3ad5e227ef 100644
--- a/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java
@@ -117,6 +117,7 @@ public abstract class AbstractCdcTest extends 
GridCommonAbstractTest {
 
         cdcCfg.setConsumer(cnsmr);
         cdcCfg.setKeepBinary(keepBinary());
+        cdcCfg.setMetricExporterSpi(metricExporters());
 
         return new CdcMain(cfg, null, cdcCfg) {
             @Override protected CdcConsumerState createState(Path stateDir) {
diff --git 
a/modules/core/src/test/java/org/apache/ignite/cdc/CdcPushMetricsExporterTest.java
 
b/modules/core/src/test/java/org/apache/ignite/cdc/CdcPushMetricsExporterTest.java
new file mode 100644
index 00000000000..db1a6f43a84
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/cdc/CdcPushMetricsExporterTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.metric.PushMetricsExporterAdapter;
+import org.apache.ignite.spi.metric.MetricExporterSpi;
+import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi;
+import org.junit.Test;
+
+/** */
+public class CdcPushMetricsExporterTest extends AbstractCdcTest {
+    /** */
+    private final AtomicBoolean metricsExported = new AtomicBoolean(false);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+            .setWalForceArchiveTimeout(WAL_ARCHIVE_TIMEOUT)
+            .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                .setCdcEnabled(true)));
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected MetricExporterSpi[] metricExporters() {
+        PushMetricsExporterAdapter pushMetricsExporter = new 
PushMetricsExporterAdapter() {
+            @Override public void export() {
+                metricsExported.set(true);
+            }
+        };
+
+        pushMetricsExporter.setPeriod(100);
+
+        return new MetricExporterSpi[] {
+            pushMetricsExporter,
+            new JmxMetricExporterSpi(),
+        };
+    }
+
+    /** Test checks that metrics are exported via exporter based on the push 
metrics exporter adapter. */
+    @Test
+    public void testPushMetricsExporter() throws Exception {
+        IgniteConfiguration cfg = getConfiguration("ignite-0");
+
+        Ignite ign = startGrid(cfg);
+
+        IgniteCache<Integer, User> cache = 
ign.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+        addAndWaitForConsumption(
+            new UserCdcConsumer(),
+            cfg,
+            cache,
+            null,
+            CdcPushMetricsExporterTest::addData,
+            0,
+            KEYS_CNT,
+            true
+        );
+
+        assertTrue(metricsExported.get());
+    }
+
+    /** */
+    public static void addData(IgniteCache<Integer, User> cache, int from, int 
to) {
+        for (int i = from; i < to; i++)
+            cache.put(i, createUser(i));
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index 7d2ea125dbe..0d46f7390b1 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -25,6 +25,7 @@ import org.apache.ignite.cdc.CdcCacheVersionTest;
 import org.apache.ignite.cdc.CdcIgniteNodeActiveModeTest;
 import org.apache.ignite.cdc.CdcManagerTest;
 import org.apache.ignite.cdc.CdcNonDefaultWorkDirTest;
+import org.apache.ignite.cdc.CdcPushMetricsExporterTest;
 import org.apache.ignite.cdc.CdcSelfTest;
 import org.apache.ignite.cdc.RestartWithWalForceArchiveTimeoutTest;
 import org.apache.ignite.cdc.TransformedCdcSelfTest;
@@ -166,6 +167,7 @@ public class IgnitePdsTestSuite2 {
         GridTestUtils.addTestIfNeeded(suite, CdcNonDefaultWorkDirTest.class, 
ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, CdcManagerTest.class, 
ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
CdcIgniteNodeActiveModeTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, CdcPushMetricsExporterTest.class, 
ignoredTests);
 
         // new style folders with generated consistent ID test
         GridTestUtils.addTestIfNeeded(suite, 
IgniteUidAsConsistentIdMigrationTest.class, ignoredTests);
diff --git 
a/modules/ducktests/tests/ignitetest/services/utils/cdc/ignite_cdc.py 
b/modules/ducktests/tests/ignitetest/services/utils/cdc/ignite_cdc.py
index 481547dd2d0..dc91f93343a 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/cdc/ignite_cdc.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/cdc/ignite_cdc.py
@@ -17,6 +17,7 @@
 """
 This module contains Ignite CDC utility (ignite-cdc.sh) wrapper.
 """
+import os
 import re
 import signal
 from copy import deepcopy
@@ -93,7 +94,8 @@ class IgniteCdcUtility(JvmProcessMixin):
 
         envs["CDC_JVM_OPTS"] = f"\"{' '.join(cdc_jvm_opts)}\""
 
-        return f"{envs_to_exports(envs)} bash " + self.cluster.script(cmd)
+        return (f"{envs_to_exports(envs)} bash " + self.cluster.script(cmd) +
+                f" 2>&1 | tee -a {os.path.join(self.cluster.log_dir, 
'ignite-cdc-console.log')} &")
 
     @staticmethod
     def __parse_output(raw_output):
diff --git 
a/modules/opencensus/src/main/java/org/apache/ignite/spi/metric/opencensus/OpenCensusMetricExporterSpi.java
 
b/modules/opencensus/src/main/java/org/apache/ignite/spi/metric/opencensus/OpenCensusMetricExporterSpi.java
index 12b83645c65..db70f520ad6 100644
--- 
a/modules/opencensus/src/main/java/org/apache/ignite/spi/metric/opencensus/OpenCensusMetricExporterSpi.java
+++ 
b/modules/opencensus/src/main/java/org/apache/ignite/spi/metric/opencensus/OpenCensusMetricExporterSpi.java
@@ -292,8 +292,10 @@ public class OpenCensusMetricExporterSpi extends 
PushMetricsExporterAdapter {
     @Override protected void onContextInitialized0(IgniteSpiContext spiCtx) 
throws IgniteSpiException {
         super.onContextInitialized0(spiCtx);
 
-        consistenIdValue = TagValue.create(
-            
((IgniteEx)ignite()).context().discovery().localNode().consistentId().toString());
+        if (sendConsistentId) {
+            consistenIdValue = TagValue.create(
+                
((IgniteEx)ignite()).context().discovery().localNode().consistentId().toString());
+        }
     }
 
     /**

Reply via email to