This is an automated email from the ASF dual-hosted git repository. timoninmaxim pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new b27817afe2b IGNITE-21550 Expose CDC metrics via push metrics exporter spi (#11248) b27817afe2b is described below commit b27817afe2b159b0562f999be6fb5537ce129433 Author: Sergey Korotkov <serge.korot...@gmail.com> AuthorDate: Thu Feb 29 21:51:47 2024 +0700 IGNITE-21550 Expose CDC metrics via push metrics exporter spi (#11248) --- .../org/apache/ignite/internal/cdc/CdcMain.java | 5 ++ .../wal/reader/StandaloneGridKernalContext.java | 24 +++++- .../org/apache/ignite/cdc/AbstractCdcTest.java | 1 + .../ignite/cdc/CdcPushMetricsExporterTest.java | 92 ++++++++++++++++++++++ .../ignite/testsuites/IgnitePdsTestSuite2.java | 2 + .../ignitetest/services/utils/cdc/ignite_cdc.py | 4 +- .../opencensus/OpenCensusMetricExporterSpi.java | 6 +- 7 files changed, 128 insertions(+), 6 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java index 2f9269b18b1..a85cd9bf858 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java @@ -78,6 +78,7 @@ import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi; import org.apache.ignite.spi.metric.noop.NoopMetricExporterSpi; import org.apache.ignite.startup.cmdline.CdcCommandLineStartup; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK; import static org.apache.ignite.internal.IgniteKernal.NL; import static org.apache.ignite.internal.IgniteKernal.SITE; import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR; @@ -321,6 +322,8 @@ public class CdcMain implements Runnable { throw new IllegalArgumentException(ERR_MSG); } + System.setProperty(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK, "true"); + try (CdcFileLockHolder lock = lockPds()) { String consIdDir = cdcDir.getName(cdcDir.getNameCount() - 1).toString(); @@ -416,6 +419,8 @@ public class CdcMain implements Runnable { startAllComponents(kctx); + kctx.metric().onKernalStart(true); + mreg = kctx.metric().registry("cdc"); return kctx; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java index af8a24f503c..a2da615224c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java @@ -137,6 +137,9 @@ public class StandaloneGridKernalContext implements GridKernalContext { /** System view manager. */ private final GridSystemViewManager sysViewMgr; + /** Timeout processor. */ + private final GridTimeoutProcessor timeoutProc; + /** */ @GridToStringExclude private CacheObjectTransformerProcessor transProc; @@ -201,6 +204,7 @@ public class StandaloneGridKernalContext implements GridKernalContext { rsrcProc = new GridResourceProcessor(this); metricMgr = new GridMetricManager(this); sysViewMgr = new GridSystemViewManager(this); + timeoutProc = new GridTimeoutProcessor(this); transProc = createComponent(CacheObjectTransformerProcessor.class); // Fake folder provided to perform processor startup on empty folder. @@ -212,6 +216,7 @@ public class StandaloneGridKernalContext implements GridKernalContext { comps.add(rsrcProc); comps.add(cacheObjProcessor); comps.add(metricMgr); + comps.add(timeoutProc); if (marshallerMappingFileStoreDir != null) { marshallerCtx.setMarshallerMappingFileStoreDir(marshallerMappingFileStoreDir); @@ -306,7 +311,20 @@ public class StandaloneGridKernalContext implements GridKernalContext { /** {@inheritDoc} */ @Override public IgniteEx grid() { - final IgniteEx kernal = new IgniteKernal(); + final IgniteEx kernal = new IgniteKernal() { + /** + * Override to return the non-null context instance to make metric SPIs happy.<br> + * + * Say the SqlViewMetricExporterSpi one which may be automatically added by + * the {@link GridMetricManager} if indexing or query engine are found in classpath + * (which is the default behaviour). + * + * @return Kernal context. + */ + @Override public GridKernalContext context() { + return StandaloneGridKernalContext.this; + } + }; try { setField(kernal, "cfg", cfg); setField(kernal, "igniteInstanceName", cfg.getIgniteInstanceName()); @@ -319,7 +337,7 @@ public class StandaloneGridKernalContext implements GridKernalContext { /** */ private void setField(IgniteEx kernal, String name, Object val) throws NoSuchFieldException, IllegalAccessException { - Field field = kernal.getClass().getDeclaredField(name); + Field field = kernal.getClass().getSuperclass().getDeclaredField(name); field.setAccessible(true); field.set(kernal, val); } @@ -346,7 +364,7 @@ public class StandaloneGridKernalContext implements GridKernalContext { /** {@inheritDoc} */ @Override public GridTimeoutProcessor timeout() { - return null; + return timeoutProc; } /** {@inheritDoc} */ diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java index dcab0bcc441..c3ad5e227ef 100644 --- a/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java @@ -117,6 +117,7 @@ public abstract class AbstractCdcTest extends GridCommonAbstractTest { cdcCfg.setConsumer(cnsmr); cdcCfg.setKeepBinary(keepBinary()); + cdcCfg.setMetricExporterSpi(metricExporters()); return new CdcMain(cfg, null, cdcCfg) { @Override protected CdcConsumerState createState(Path stateDir) { diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcPushMetricsExporterTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/CdcPushMetricsExporterTest.java new file mode 100644 index 00000000000..db1a6f43a84 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcPushMetricsExporterTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cdc; + +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.processors.metric.PushMetricsExporterAdapter; +import org.apache.ignite.spi.metric.MetricExporterSpi; +import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi; +import org.junit.Test; + +/** */ +public class CdcPushMetricsExporterTest extends AbstractCdcTest { + /** */ + private final AtomicBoolean metricsExported = new AtomicBoolean(false); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setDataStorageConfiguration(new DataStorageConfiguration() + .setWalForceArchiveTimeout(WAL_ARCHIVE_TIMEOUT) + .setDefaultDataRegionConfiguration(new DataRegionConfiguration() + .setCdcEnabled(true))); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected MetricExporterSpi[] metricExporters() { + PushMetricsExporterAdapter pushMetricsExporter = new PushMetricsExporterAdapter() { + @Override public void export() { + metricsExported.set(true); + } + }; + + pushMetricsExporter.setPeriod(100); + + return new MetricExporterSpi[] { + pushMetricsExporter, + new JmxMetricExporterSpi(), + }; + } + + /** Test checks that metrics are exported via exporter based on the push metrics exporter adapter. */ + @Test + public void testPushMetricsExporter() throws Exception { + IgniteConfiguration cfg = getConfiguration("ignite-0"); + + Ignite ign = startGrid(cfg); + + IgniteCache<Integer, User> cache = ign.getOrCreateCache(DEFAULT_CACHE_NAME); + + addAndWaitForConsumption( + new UserCdcConsumer(), + cfg, + cache, + null, + CdcPushMetricsExporterTest::addData, + 0, + KEYS_CNT, + true + ); + + assertTrue(metricsExported.get()); + } + + /** */ + public static void addData(IgniteCache<Integer, User> cache, int from, int to) { + for (int i = from; i < to; i++) + cache.put(i, createUser(i)); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java index 7d2ea125dbe..0d46f7390b1 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java @@ -25,6 +25,7 @@ import org.apache.ignite.cdc.CdcCacheVersionTest; import org.apache.ignite.cdc.CdcIgniteNodeActiveModeTest; import org.apache.ignite.cdc.CdcManagerTest; import org.apache.ignite.cdc.CdcNonDefaultWorkDirTest; +import org.apache.ignite.cdc.CdcPushMetricsExporterTest; import org.apache.ignite.cdc.CdcSelfTest; import org.apache.ignite.cdc.RestartWithWalForceArchiveTimeoutTest; import org.apache.ignite.cdc.TransformedCdcSelfTest; @@ -166,6 +167,7 @@ public class IgnitePdsTestSuite2 { GridTestUtils.addTestIfNeeded(suite, CdcNonDefaultWorkDirTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, CdcManagerTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, CdcIgniteNodeActiveModeTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, CdcPushMetricsExporterTest.class, ignoredTests); // new style folders with generated consistent ID test GridTestUtils.addTestIfNeeded(suite, IgniteUidAsConsistentIdMigrationTest.class, ignoredTests); diff --git a/modules/ducktests/tests/ignitetest/services/utils/cdc/ignite_cdc.py b/modules/ducktests/tests/ignitetest/services/utils/cdc/ignite_cdc.py index 481547dd2d0..dc91f93343a 100644 --- a/modules/ducktests/tests/ignitetest/services/utils/cdc/ignite_cdc.py +++ b/modules/ducktests/tests/ignitetest/services/utils/cdc/ignite_cdc.py @@ -17,6 +17,7 @@ """ This module contains Ignite CDC utility (ignite-cdc.sh) wrapper. """ +import os import re import signal from copy import deepcopy @@ -93,7 +94,8 @@ class IgniteCdcUtility(JvmProcessMixin): envs["CDC_JVM_OPTS"] = f"\"{' '.join(cdc_jvm_opts)}\"" - return f"{envs_to_exports(envs)} bash " + self.cluster.script(cmd) + return (f"{envs_to_exports(envs)} bash " + self.cluster.script(cmd) + + f" 2>&1 | tee -a {os.path.join(self.cluster.log_dir, 'ignite-cdc-console.log')} &") @staticmethod def __parse_output(raw_output): diff --git a/modules/opencensus/src/main/java/org/apache/ignite/spi/metric/opencensus/OpenCensusMetricExporterSpi.java b/modules/opencensus/src/main/java/org/apache/ignite/spi/metric/opencensus/OpenCensusMetricExporterSpi.java index 12b83645c65..db70f520ad6 100644 --- a/modules/opencensus/src/main/java/org/apache/ignite/spi/metric/opencensus/OpenCensusMetricExporterSpi.java +++ b/modules/opencensus/src/main/java/org/apache/ignite/spi/metric/opencensus/OpenCensusMetricExporterSpi.java @@ -292,8 +292,10 @@ public class OpenCensusMetricExporterSpi extends PushMetricsExporterAdapter { @Override protected void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException { super.onContextInitialized0(spiCtx); - consistenIdValue = TagValue.create( - ((IgniteEx)ignite()).context().discovery().localNode().consistentId().toString()); + if (sendConsistentId) { + consistenIdValue = TagValue.create( + ((IgniteEx)ignite()).context().discovery().localNode().consistentId().toString()); + } } /**