This is an automated email from the ASF dual-hosted git repository.

timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ebc8df1aa2 IGNITE-21678 Fix Cdc error with ignite to ignite streamer 
in case BinaryConfiguration is specified in config (#11275)
2ebc8df1aa2 is described below

commit 2ebc8df1aa22ac105003360b57d83bbd0e9fc2df
Author: Sergey Korotkov <serge.korot...@gmail.com>
AuthorDate: Thu Mar 14 15:49:46 2024 +0700

    IGNITE-21678 Fix Cdc error with ignite to ignite streamer in case 
BinaryConfiguration is specified in config (#11275)
---
 .../org/apache/ignite/internal/cdc/CdcMain.java    |   9 +-
 .../wal/reader/StandaloneSpiContext.java           | 250 +++++++++++++++++++++
 .../ignite/cdc/CdcPushMetricsExporterTest.java     | 126 ++++++++++-
 3 files changed, 376 insertions(+), 9 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java 
b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
index a85cd9bf858..33d015bfb34 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
@@ -62,6 +62,7 @@ import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAhea
 import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneSpiContext;
 import org.apache.ignite.internal.processors.metric.MetricRegistry;
 import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
 import org.apache.ignite.internal.processors.metric.impl.HistogramMetricImpl;
@@ -74,11 +75,11 @@ import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.marshaller.MarshallerUtils;
 import org.apache.ignite.platform.PlatformType;
+import org.apache.ignite.spi.IgniteSpi;
 import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi;
 import org.apache.ignite.spi.metric.noop.NoopMetricExporterSpi;
 import org.apache.ignite.startup.cmdline.CdcCommandLineStartup;
 
-import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
 import static org.apache.ignite.internal.IgniteKernal.NL;
 import static org.apache.ignite.internal.IgniteKernal.SITE;
 import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR;
@@ -322,8 +323,6 @@ public class CdcMain implements Runnable {
             throw new IllegalArgumentException(ERR_MSG);
         }
 
-        System.setProperty(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK, 
"true");
-
         try (CdcFileLockHolder lock = lockPds()) {
             String consIdDir = cdcDir.getName(cdcDir.getNameCount() - 
1).toString();
 
@@ -419,7 +418,9 @@ public class CdcMain implements Runnable {
 
         startAllComponents(kctx);
 
-        kctx.metric().onKernalStart(true);
+        for (IgniteSpi metricSpi : kctx.config().getMetricExporterSpi()) {
+            metricSpi.onContextInitialized(new StandaloneSpiContext());
+        }
 
         mreg = kctx.metric().registry("cdc");
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneSpiContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneSpiContext.java
new file mode 100644
index 00000000000..887d35739b3
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneSpiContext.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.reader;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Consumer;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import org.apache.ignite.plugin.extensions.communication.MessageFormatter;
+import org.apache.ignite.plugin.security.SecuritySubject;
+import org.apache.ignite.spi.IgniteNodeValidationResult;
+import org.apache.ignite.spi.IgnitePortProtocol;
+import org.apache.ignite.spi.IgniteSpiContext;
+import org.apache.ignite.spi.IgniteSpiTimeoutObject;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+import org.apache.ignite.spi.metric.ReadOnlyMetricRegistry;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Dummy SPI context for offline utilities.
+ */
+public class StandaloneSpiContext implements IgniteSpiContext {
+    /** {@inheritDoc} */
+    @Override public void addLocalEventListener(GridLocalEventListener lsnr, 
int... types) {
+        /* No-op. */
+    }
+
+    /** {@inheritDoc} */
+    @Override public void addMessageListener(GridMessageListener lsnr, String 
topic) {
+        /* No-op. */
+    }
+
+    /** {@inheritDoc} */
+    @Override public void addLocalMessageListener(Object topic, 
IgniteBiPredicate<UUID, ?> p) {
+        /* No-op. */
+    }
+
+    /** {@inheritDoc} */
+    @Override public void recordEvent(Event evt) {
+        /* No-op. */
+    }
+
+    /** {@inheritDoc} */
+    @Override public void registerPort(int port, IgnitePortProtocol proto) {
+        /* No-op. */
+    }
+
+    /** {@inheritDoc} */
+    @Override public void deregisterPort(int port, IgnitePortProtocol proto) {
+        /* No-op. */
+    }
+
+    /** {@inheritDoc} */
+    @Override public void deregisterPorts() {
+        /* No-op. */
+    }
+
+    /** {@inheritDoc} */
+    @Override public <K, V> V get(String cacheName, K key) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <K, V> V put(String cacheName, K key, V val, long ttl) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <K, V> V putIfAbsent(String cacheName, K key, V val, long 
ttl) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <K, V> V remove(String cacheName, K key) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <K> boolean containsKey(String cacheName, K key) {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int partition(String cacheName, Object key) {
+        return -1;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<ClusterNode> nodes() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterNode localNode() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public ClusterNode node(UUID nodeId) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<ClusterNode> remoteNodes() {
+        return Collections.emptyList();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean pingNode(UUID nodeId) {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean removeLocalEventListener(GridLocalEventListener 
lsnr) {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isEventRecordable(int... types) {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void removeLocalMessageListener(Object topic, 
IgniteBiPredicate<UUID, ?> p) {
+        /* No-op. */
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean removeMessageListener(GridMessageListener lsnr, 
String topic) {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void send(ClusterNode node, Serializable msg, String 
topic) {
+        /* No-op. */
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public IgniteNodeValidationResult 
validateNode(ClusterNode node) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public IgniteNodeValidationResult 
validateNode(ClusterNode node, DiscoveryDataBag discoData) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<SecuritySubject> authenticatedSubjects() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public SecuritySubject authenticatedSubject(UUID subjId) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public MessageFormatter messageFormatter() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public MessageFactory messageFactory() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isStopping() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean tryFailNode(UUID nodeId, @Nullable String 
warning) {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void failNode(UUID nodeId, @Nullable String warning) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void addTimeoutObject(IgniteSpiTimeoutObject obj) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void removeTimeoutObject(IgniteSpiTimeoutObject obj) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<String, Object> nodeAttributes() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean communicationFailureResolveSupported() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void resolveCommunicationFailure(ClusterNode node, 
Exception err) {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public ReadOnlyMetricRegistry getOrCreateMetricRegistry(String 
name) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void removeMetricRegistry(String name) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public Iterable<ReadOnlyMetricRegistry> metricRegistries() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void 
addMetricRegistryCreationListener(Consumer<ReadOnlyMetricRegistry> lsnr) {
+        // No-op.
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/cdc/CdcPushMetricsExporterTest.java
 
b/modules/core/src/test/java/org/apache/ignite/cdc/CdcPushMetricsExporterTest.java
index db1a6f43a84..3350b3f3fd6 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/cdc/CdcPushMetricsExporterTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/cdc/CdcPushMetricsExporterTest.java
@@ -17,30 +17,67 @@
 
 package org.apache.ignite.cdc;
 
+import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.configuration.BinaryConfiguration;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
 import org.apache.ignite.internal.processors.metric.PushMetricsExporterAdapter;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.spi.metric.MetricExporterSpi;
 import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi;
+import org.apache.ignite.testframework.CallbackExecutorLogListener;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.junits.multijvm.IgniteProcessProxy;
 import org.junit.Test;
 
+import static java.util.Collections.emptyList;
+import static 
org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.DFLT_PORT_RANGE;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+
 /** */
 public class CdcPushMetricsExporterTest extends AbstractCdcTest {
     /** */
     private final AtomicBoolean metricsExported = new AtomicBoolean(false);
 
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+    /** */
+    private ListeningTestLogger lsnrLog;
+
+    /** */
+    private IgniteConfiguration getSrcClusterCfg(String igniteInstanceName) 
throws Exception {
+        IgniteConfiguration cfg = getConfiguration(igniteInstanceName);
 
         cfg.setDataStorageConfiguration(new DataStorageConfiguration()
             .setWalForceArchiveTimeout(WAL_ARCHIVE_TIMEOUT)
             .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
-                .setCdcEnabled(true)));
+                .setCdcEnabled(true)))
+            .setBinaryConfiguration(new 
BinaryConfiguration().setCompactFooter(true));
+
+        return cfg;
+    }
+
+    /** */
+    private IgniteConfiguration getDestClusterCfg(String igniteInstanceName) 
throws Exception {
+        IgniteConfiguration cfg = getConfiguration(igniteInstanceName);
+
+        cfg.setGridLogger(lsnrLog)
+            .setBinaryConfiguration(new 
BinaryConfiguration().setCompactFooter(true))
+            .setDiscoverySpi(new TcpDiscoverySpi()
+                .setLocalPort(TcpDiscoverySpi.DFLT_PORT + DFLT_PORT_RANGE)
+                .setLocalAddress("127.0.0.1")
+                .setIpFinder(new TcpDiscoveryVmIpFinder()
+                    .setAddresses(Collections.singleton("127.0.0.1:" + 
(TcpDiscoverySpi.DFLT_PORT + DFLT_PORT_RANGE) +
+                        ".." + (TcpDiscoverySpi.DFLT_PORT + DFLT_PORT_RANGE + 
3)))
+                    .setShared(true)));
 
         return cfg;
     }
@@ -61,10 +98,15 @@ public class CdcPushMetricsExporterTest extends 
AbstractCdcTest {
         };
     }
 
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        lsnrLog = new ListeningTestLogger(log);
+    }
+
     /** Test checks that metrics are exported via exporter based on the push 
metrics exporter adapter. */
     @Test
     public void testPushMetricsExporter() throws Exception {
-        IgniteConfiguration cfg = getConfiguration("ignite-0");
+        IgniteConfiguration cfg = getSrcClusterCfg("ignite-0");
 
         Ignite ign = startGrid(cfg);
 
@@ -84,9 +126,83 @@ public class CdcPushMetricsExporterTest extends 
AbstractCdcTest {
         assertTrue(metricsExported.get());
     }
 
+    /** Test checks that consumer can start ignite client node and connect to 
destination cluster. */
+    @Test
+    public void testIgniteToIgniteConsumer() throws Exception {
+        IgniteConfiguration srcClusterCfg = getSrcClusterCfg("src-cluster");
+        startGrid(srcClusterCfg);
+
+        final CountDownLatch destClusterStarted = new CountDownLatch(1);
+
+        lsnrLog.registerListener(new CallbackExecutorLogListener(".*Ignite 
node started OK \\(id=.*, instance name=dest-cluster\\)",
+            destClusterStarted::countDown));
+
+        IgniteConfiguration destClusterCfg = getDestClusterCfg("dest-cluster");
+
+        IgniteProcessProxy targetIgn = startRemoteDestCluster(destClusterCfg);
+
+        destClusterStarted.await(30, TimeUnit.SECONDS);
+
+        final CountDownLatch cdcClientNodeJoined = new CountDownLatch(1);
+
+        lsnrLog.registerListener(new 
CallbackExecutorLogListener(".*TestIgniteToIgniteConsumer started.",
+            cdcClientNodeJoined::countDown));
+
+        CdcConfiguration cdcCfg = new CdcConfiguration();
+
+        IgniteConfiguration destClusterCliCfg = getDestClusterCfg("cdc-client")
+            .setClientMode(true);
+
+        cdcCfg.setConsumer(new TestIgniteToIgniteConsumer(destClusterCliCfg));
+
+        CdcMain cdc = new CdcMain(srcClusterCfg, null, cdcCfg);
+
+        runAsync(cdc);
+
+        assertTrue(cdcClientNodeJoined.await(30, TimeUnit.SECONDS));
+
+        cdc.stop();
+
+        targetIgn.kill();
+    }
+
+    /**
+     * @param destClusterCfg destination cluster config.
+     * @return Ignite proxy for destination cluster started in another JVM.
+     */
+    private IgniteProcessProxy startRemoteDestCluster(IgniteConfiguration 
destClusterCfg) throws Exception {
+        return new IgniteProcessProxy(
+            destClusterCfg,
+            destClusterCfg.getGridLogger(),
+            null,
+            false,
+            emptyList()
+        );
+    }
+
     /** */
     public static void addData(IgniteCache<Integer, User> cache, int from, int 
to) {
         for (int i = from; i < to; i++)
             cache.put(i, createUser(i));
     }
+
+    /** Test CDC consumer invoking the ignite client node. */
+    public static class TestIgniteToIgniteConsumer extends UserCdcConsumer {
+        /** */
+        private final IgniteConfiguration destClusterCliCfg;
+
+        /** */
+        public TestIgniteToIgniteConsumer(IgniteConfiguration 
destClusterCliCfg) {
+            this.destClusterCliCfg = destClusterCliCfg;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void start(MetricRegistry mreg) {
+            Ignite ignite = Ignition.start(destClusterCliCfg);
+
+            super.start(mreg);
+
+            ignite.log().info("TestIgniteToIgniteConsumer started.");
+        }
+    }
 }

Reply via email to