This is an automated email from the ASF dual-hosted git repository. timoninmaxim pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 2ebc8df1aa2 IGNITE-21678 Fix Cdc error with ignite to ignite streamer in case BinaryConfiguration is specified in config (#11275) 2ebc8df1aa2 is described below commit 2ebc8df1aa22ac105003360b57d83bbd0e9fc2df Author: Sergey Korotkov <serge.korot...@gmail.com> AuthorDate: Thu Mar 14 15:49:46 2024 +0700 IGNITE-21678 Fix Cdc error with ignite to ignite streamer in case BinaryConfiguration is specified in config (#11275) --- .../org/apache/ignite/internal/cdc/CdcMain.java | 9 +- .../wal/reader/StandaloneSpiContext.java | 250 +++++++++++++++++++++ .../ignite/cdc/CdcPushMetricsExporterTest.java | 126 ++++++++++- 3 files changed, 376 insertions(+), 9 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java index a85cd9bf858..33d015bfb34 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java @@ -62,6 +62,7 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAhea import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory; import org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext; +import org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneSpiContext; import org.apache.ignite.internal.processors.metric.MetricRegistry; import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric; import org.apache.ignite.internal.processors.metric.impl.HistogramMetricImpl; @@ -74,11 +75,11 @@ import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.marshaller.MarshallerUtils; import org.apache.ignite.platform.PlatformType; +import org.apache.ignite.spi.IgniteSpi; import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi; import org.apache.ignite.spi.metric.noop.NoopMetricExporterSpi; import org.apache.ignite.startup.cmdline.CdcCommandLineStartup; -import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK; import static org.apache.ignite.internal.IgniteKernal.NL; import static org.apache.ignite.internal.IgniteKernal.SITE; import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR; @@ -322,8 +323,6 @@ public class CdcMain implements Runnable { throw new IllegalArgumentException(ERR_MSG); } - System.setProperty(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK, "true"); - try (CdcFileLockHolder lock = lockPds()) { String consIdDir = cdcDir.getName(cdcDir.getNameCount() - 1).toString(); @@ -419,7 +418,9 @@ public class CdcMain implements Runnable { startAllComponents(kctx); - kctx.metric().onKernalStart(true); + for (IgniteSpi metricSpi : kctx.config().getMetricExporterSpi()) { + metricSpi.onContextInitialized(new StandaloneSpiContext()); + } mreg = kctx.metric().registry("cdc"); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneSpiContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneSpiContext.java new file mode 100644 index 00000000000..887d35739b3 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneSpiContext.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.wal.reader; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.UUID; +import java.util.function.Consumer; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.events.Event; +import org.apache.ignite.internal.managers.communication.GridMessageListener; +import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.plugin.extensions.communication.MessageFactory; +import org.apache.ignite.plugin.extensions.communication.MessageFormatter; +import org.apache.ignite.plugin.security.SecuritySubject; +import org.apache.ignite.spi.IgniteNodeValidationResult; +import org.apache.ignite.spi.IgnitePortProtocol; +import org.apache.ignite.spi.IgniteSpiContext; +import org.apache.ignite.spi.IgniteSpiTimeoutObject; +import org.apache.ignite.spi.discovery.DiscoveryDataBag; +import org.apache.ignite.spi.metric.ReadOnlyMetricRegistry; +import org.jetbrains.annotations.Nullable; + +/** + * Dummy SPI context for offline utilities. + */ +public class StandaloneSpiContext implements IgniteSpiContext { + /** {@inheritDoc} */ + @Override public void addLocalEventListener(GridLocalEventListener lsnr, int... types) { + /* No-op. */ + } + + /** {@inheritDoc} */ + @Override public void addMessageListener(GridMessageListener lsnr, String topic) { + /* No-op. */ + } + + /** {@inheritDoc} */ + @Override public void addLocalMessageListener(Object topic, IgniteBiPredicate<UUID, ?> p) { + /* No-op. */ + } + + /** {@inheritDoc} */ + @Override public void recordEvent(Event evt) { + /* No-op. */ + } + + /** {@inheritDoc} */ + @Override public void registerPort(int port, IgnitePortProtocol proto) { + /* No-op. */ + } + + /** {@inheritDoc} */ + @Override public void deregisterPort(int port, IgnitePortProtocol proto) { + /* No-op. */ + } + + /** {@inheritDoc} */ + @Override public void deregisterPorts() { + /* No-op. */ + } + + /** {@inheritDoc} */ + @Override public <K, V> V get(String cacheName, K key) { + return null; + } + + /** {@inheritDoc} */ + @Override public <K, V> V put(String cacheName, K key, V val, long ttl) { + return null; + } + + /** {@inheritDoc} */ + @Override public <K, V> V putIfAbsent(String cacheName, K key, V val, long ttl) { + return null; + } + + /** {@inheritDoc} */ + @Override public <K, V> V remove(String cacheName, K key) { + return null; + } + + /** {@inheritDoc} */ + @Override public <K> boolean containsKey(String cacheName, K key) { + return false; + } + + /** {@inheritDoc} */ + @Override public int partition(String cacheName, Object key) { + return -1; + } + + /** {@inheritDoc} */ + @Override public Collection<ClusterNode> nodes() { + return null; + } + + /** {@inheritDoc} */ + @Override public ClusterNode localNode() { + return null; + } + + /** {@inheritDoc} */ + @Nullable @Override public ClusterNode node(UUID nodeId) { + return null; + } + + /** {@inheritDoc} */ + @Override public Collection<ClusterNode> remoteNodes() { + return Collections.emptyList(); + } + + /** {@inheritDoc} */ + @Override public boolean pingNode(UUID nodeId) { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean removeLocalEventListener(GridLocalEventListener lsnr) { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean isEventRecordable(int... types) { + return true; + } + + /** {@inheritDoc} */ + @Override public void removeLocalMessageListener(Object topic, IgniteBiPredicate<UUID, ?> p) { + /* No-op. */ + } + + /** {@inheritDoc} */ + @Override public boolean removeMessageListener(GridMessageListener lsnr, String topic) { + return false; + } + + /** {@inheritDoc} */ + @Override public void send(ClusterNode node, Serializable msg, String topic) { + /* No-op. */ + } + + /** {@inheritDoc} */ + @Nullable @Override public IgniteNodeValidationResult validateNode(ClusterNode node) { + return null; + } + + /** {@inheritDoc} */ + @Nullable @Override public IgniteNodeValidationResult validateNode(ClusterNode node, DiscoveryDataBag discoData) { + return null; + } + + /** {@inheritDoc} */ + @Override public Collection<SecuritySubject> authenticatedSubjects() { + return null; + } + + /** {@inheritDoc} */ + @Override public SecuritySubject authenticatedSubject(UUID subjId) { + return null; + } + + /** {@inheritDoc} */ + @Override public MessageFormatter messageFormatter() { + return null; + } + + /** {@inheritDoc} */ + @Override public MessageFactory messageFactory() { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean isStopping() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean tryFailNode(UUID nodeId, @Nullable String warning) { + return false; + } + + /** {@inheritDoc} */ + @Override public void failNode(UUID nodeId, @Nullable String warning) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void addTimeoutObject(IgniteSpiTimeoutObject obj) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void removeTimeoutObject(IgniteSpiTimeoutObject obj) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public Map<String, Object> nodeAttributes() { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean communicationFailureResolveSupported() { + return false; + } + + /** {@inheritDoc} */ + @Override public void resolveCommunicationFailure(ClusterNode node, Exception err) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public ReadOnlyMetricRegistry getOrCreateMetricRegistry(String name) { + return null; + } + + /** {@inheritDoc} */ + @Override public void removeMetricRegistry(String name) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public Iterable<ReadOnlyMetricRegistry> metricRegistries() { + return null; + } + + /** {@inheritDoc} */ + @Override public void addMetricRegistryCreationListener(Consumer<ReadOnlyMetricRegistry> lsnr) { + // No-op. + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcPushMetricsExporterTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/CdcPushMetricsExporterTest.java index db1a6f43a84..3350b3f3fd6 100644 --- a/modules/core/src/test/java/org/apache/ignite/cdc/CdcPushMetricsExporterTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcPushMetricsExporterTest.java @@ -17,30 +17,67 @@ package org.apache.ignite.cdc; +import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.Ignition; +import org.apache.ignite.configuration.BinaryConfiguration; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.cdc.CdcMain; +import org.apache.ignite.internal.processors.metric.MetricRegistry; import org.apache.ignite.internal.processors.metric.PushMetricsExporterAdapter; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.spi.metric.MetricExporterSpi; import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi; +import org.apache.ignite.testframework.CallbackExecutorLogListener; +import org.apache.ignite.testframework.ListeningTestLogger; +import org.apache.ignite.testframework.junits.multijvm.IgniteProcessProxy; import org.junit.Test; +import static java.util.Collections.emptyList; +import static org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.DFLT_PORT_RANGE; +import static org.apache.ignite.testframework.GridTestUtils.runAsync; + /** */ public class CdcPushMetricsExporterTest extends AbstractCdcTest { /** */ private final AtomicBoolean metricsExported = new AtomicBoolean(false); - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + /** */ + private ListeningTestLogger lsnrLog; + + /** */ + private IgniteConfiguration getSrcClusterCfg(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = getConfiguration(igniteInstanceName); cfg.setDataStorageConfiguration(new DataStorageConfiguration() .setWalForceArchiveTimeout(WAL_ARCHIVE_TIMEOUT) .setDefaultDataRegionConfiguration(new DataRegionConfiguration() - .setCdcEnabled(true))); + .setCdcEnabled(true))) + .setBinaryConfiguration(new BinaryConfiguration().setCompactFooter(true)); + + return cfg; + } + + /** */ + private IgniteConfiguration getDestClusterCfg(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = getConfiguration(igniteInstanceName); + + cfg.setGridLogger(lsnrLog) + .setBinaryConfiguration(new BinaryConfiguration().setCompactFooter(true)) + .setDiscoverySpi(new TcpDiscoverySpi() + .setLocalPort(TcpDiscoverySpi.DFLT_PORT + DFLT_PORT_RANGE) + .setLocalAddress("127.0.0.1") + .setIpFinder(new TcpDiscoveryVmIpFinder() + .setAddresses(Collections.singleton("127.0.0.1:" + (TcpDiscoverySpi.DFLT_PORT + DFLT_PORT_RANGE) + + ".." + (TcpDiscoverySpi.DFLT_PORT + DFLT_PORT_RANGE + 3))) + .setShared(true))); return cfg; } @@ -61,10 +98,15 @@ public class CdcPushMetricsExporterTest extends AbstractCdcTest { }; } + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + lsnrLog = new ListeningTestLogger(log); + } + /** Test checks that metrics are exported via exporter based on the push metrics exporter adapter. */ @Test public void testPushMetricsExporter() throws Exception { - IgniteConfiguration cfg = getConfiguration("ignite-0"); + IgniteConfiguration cfg = getSrcClusterCfg("ignite-0"); Ignite ign = startGrid(cfg); @@ -84,9 +126,83 @@ public class CdcPushMetricsExporterTest extends AbstractCdcTest { assertTrue(metricsExported.get()); } + /** Test checks that consumer can start ignite client node and connect to destination cluster. */ + @Test + public void testIgniteToIgniteConsumer() throws Exception { + IgniteConfiguration srcClusterCfg = getSrcClusterCfg("src-cluster"); + startGrid(srcClusterCfg); + + final CountDownLatch destClusterStarted = new CountDownLatch(1); + + lsnrLog.registerListener(new CallbackExecutorLogListener(".*Ignite node started OK \\(id=.*, instance name=dest-cluster\\)", + destClusterStarted::countDown)); + + IgniteConfiguration destClusterCfg = getDestClusterCfg("dest-cluster"); + + IgniteProcessProxy targetIgn = startRemoteDestCluster(destClusterCfg); + + destClusterStarted.await(30, TimeUnit.SECONDS); + + final CountDownLatch cdcClientNodeJoined = new CountDownLatch(1); + + lsnrLog.registerListener(new CallbackExecutorLogListener(".*TestIgniteToIgniteConsumer started.", + cdcClientNodeJoined::countDown)); + + CdcConfiguration cdcCfg = new CdcConfiguration(); + + IgniteConfiguration destClusterCliCfg = getDestClusterCfg("cdc-client") + .setClientMode(true); + + cdcCfg.setConsumer(new TestIgniteToIgniteConsumer(destClusterCliCfg)); + + CdcMain cdc = new CdcMain(srcClusterCfg, null, cdcCfg); + + runAsync(cdc); + + assertTrue(cdcClientNodeJoined.await(30, TimeUnit.SECONDS)); + + cdc.stop(); + + targetIgn.kill(); + } + + /** + * @param destClusterCfg destination cluster config. + * @return Ignite proxy for destination cluster started in another JVM. + */ + private IgniteProcessProxy startRemoteDestCluster(IgniteConfiguration destClusterCfg) throws Exception { + return new IgniteProcessProxy( + destClusterCfg, + destClusterCfg.getGridLogger(), + null, + false, + emptyList() + ); + } + /** */ public static void addData(IgniteCache<Integer, User> cache, int from, int to) { for (int i = from; i < to; i++) cache.put(i, createUser(i)); } + + /** Test CDC consumer invoking the ignite client node. */ + public static class TestIgniteToIgniteConsumer extends UserCdcConsumer { + /** */ + private final IgniteConfiguration destClusterCliCfg; + + /** */ + public TestIgniteToIgniteConsumer(IgniteConfiguration destClusterCliCfg) { + this.destClusterCliCfg = destClusterCliCfg; + } + + /** {@inheritDoc} */ + @Override public void start(MetricRegistry mreg) { + Ignite ignite = Ignition.start(destClusterCliCfg); + + super.start(mreg); + + ignite.log().info("TestIgniteToIgniteConsumer started."); + } + } }