This is an automated email from the ASF dual-hosted git repository.
timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 2ebc8df1aa2 IGNITE-21678 Fix Cdc error with ignite to ignite streamer
in case BinaryConfiguration is specified in config (#11275)
2ebc8df1aa2 is described below
commit 2ebc8df1aa22ac105003360b57d83bbd0e9fc2df
Author: Sergey Korotkov <[email protected]>
AuthorDate: Thu Mar 14 15:49:46 2024 +0700
IGNITE-21678 Fix Cdc error with ignite to ignite streamer in case
BinaryConfiguration is specified in config (#11275)
---
.../org/apache/ignite/internal/cdc/CdcMain.java | 9 +-
.../wal/reader/StandaloneSpiContext.java | 250 +++++++++++++++++++++
.../ignite/cdc/CdcPushMetricsExporterTest.java | 126 ++++++++++-
3 files changed, 376 insertions(+), 9 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
index a85cd9bf858..33d015bfb34 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
@@ -62,6 +62,7 @@ import
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAhea
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import
org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
import
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext;
+import
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneSpiContext;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
import org.apache.ignite.internal.processors.metric.impl.HistogramMetricImpl;
@@ -74,11 +75,11 @@ import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.platform.PlatformType;
+import org.apache.ignite.spi.IgniteSpi;
import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi;
import org.apache.ignite.spi.metric.noop.NoopMetricExporterSpi;
import org.apache.ignite.startup.cmdline.CdcCommandLineStartup;
-import static
org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
import static org.apache.ignite.internal.IgniteKernal.NL;
import static org.apache.ignite.internal.IgniteKernal.SITE;
import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR;
@@ -322,8 +323,6 @@ public class CdcMain implements Runnable {
throw new IllegalArgumentException(ERR_MSG);
}
- System.setProperty(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK,
"true");
-
try (CdcFileLockHolder lock = lockPds()) {
String consIdDir = cdcDir.getName(cdcDir.getNameCount() -
1).toString();
@@ -419,7 +418,9 @@ public class CdcMain implements Runnable {
startAllComponents(kctx);
- kctx.metric().onKernalStart(true);
+ for (IgniteSpi metricSpi : kctx.config().getMetricExporterSpi()) {
+ metricSpi.onContextInitialized(new StandaloneSpiContext());
+ }
mreg = kctx.metric().registry("cdc");
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneSpiContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneSpiContext.java
new file mode 100644
index 00000000000..887d35739b3
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneSpiContext.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.reader;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Consumer;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import org.apache.ignite.plugin.extensions.communication.MessageFormatter;
+import org.apache.ignite.plugin.security.SecuritySubject;
+import org.apache.ignite.spi.IgniteNodeValidationResult;
+import org.apache.ignite.spi.IgnitePortProtocol;
+import org.apache.ignite.spi.IgniteSpiContext;
+import org.apache.ignite.spi.IgniteSpiTimeoutObject;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+import org.apache.ignite.spi.metric.ReadOnlyMetricRegistry;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Dummy SPI context for offline utilities.
+ */
+public class StandaloneSpiContext implements IgniteSpiContext {
+ /** {@inheritDoc} */
+ @Override public void addLocalEventListener(GridLocalEventListener lsnr,
int... types) {
+ /* No-op. */
+ }
+
+ /** {@inheritDoc} */
+ @Override public void addMessageListener(GridMessageListener lsnr, String
topic) {
+ /* No-op. */
+ }
+
+ /** {@inheritDoc} */
+ @Override public void addLocalMessageListener(Object topic,
IgniteBiPredicate<UUID, ?> p) {
+ /* No-op. */
+ }
+
+ /** {@inheritDoc} */
+ @Override public void recordEvent(Event evt) {
+ /* No-op. */
+ }
+
+ /** {@inheritDoc} */
+ @Override public void registerPort(int port, IgnitePortProtocol proto) {
+ /* No-op. */
+ }
+
+ /** {@inheritDoc} */
+ @Override public void deregisterPort(int port, IgnitePortProtocol proto) {
+ /* No-op. */
+ }
+
+ /** {@inheritDoc} */
+ @Override public void deregisterPorts() {
+ /* No-op. */
+ }
+
+ /** {@inheritDoc} */
+ @Override public <K, V> V get(String cacheName, K key) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <K, V> V put(String cacheName, K key, V val, long ttl) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <K, V> V putIfAbsent(String cacheName, K key, V val, long
ttl) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <K, V> V remove(String cacheName, K key) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <K> boolean containsKey(String cacheName, K key) {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int partition(String cacheName, Object key) {
+ return -1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<ClusterNode> nodes() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClusterNode localNode() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public ClusterNode node(UUID nodeId) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<ClusterNode> remoteNodes() {
+ return Collections.emptyList();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean pingNode(UUID nodeId) {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean removeLocalEventListener(GridLocalEventListener
lsnr) {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isEventRecordable(int... types) {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeLocalMessageListener(Object topic,
IgniteBiPredicate<UUID, ?> p) {
+ /* No-op. */
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean removeMessageListener(GridMessageListener lsnr,
String topic) {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void send(ClusterNode node, Serializable msg, String
topic) {
+ /* No-op. */
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public IgniteNodeValidationResult
validateNode(ClusterNode node) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public IgniteNodeValidationResult
validateNode(ClusterNode node, DiscoveryDataBag discoData) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<SecuritySubject> authenticatedSubjects() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public SecuritySubject authenticatedSubject(UUID subjId) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public MessageFormatter messageFormatter() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public MessageFactory messageFactory() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isStopping() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean tryFailNode(UUID nodeId, @Nullable String
warning) {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void failNode(UUID nodeId, @Nullable String warning) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void addTimeoutObject(IgniteSpiTimeoutObject obj) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeTimeoutObject(IgniteSpiTimeoutObject obj) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<String, Object> nodeAttributes() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean communicationFailureResolveSupported() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void resolveCommunicationFailure(ClusterNode node,
Exception err) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public ReadOnlyMetricRegistry getOrCreateMetricRegistry(String
name) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeMetricRegistry(String name) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public Iterable<ReadOnlyMetricRegistry> metricRegistries() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void
addMetricRegistryCreationListener(Consumer<ReadOnlyMetricRegistry> lsnr) {
+ // No-op.
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/cdc/CdcPushMetricsExporterTest.java
b/modules/core/src/test/java/org/apache/ignite/cdc/CdcPushMetricsExporterTest.java
index db1a6f43a84..3350b3f3fd6 100644
---
a/modules/core/src/test/java/org/apache/ignite/cdc/CdcPushMetricsExporterTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/cdc/CdcPushMetricsExporterTest.java
@@ -17,30 +17,67 @@
package org.apache.ignite.cdc;
+import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.configuration.BinaryConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.metric.PushMetricsExporterAdapter;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.spi.metric.MetricExporterSpi;
import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi;
+import org.apache.ignite.testframework.CallbackExecutorLogListener;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.junits.multijvm.IgniteProcessProxy;
import org.junit.Test;
+import static java.util.Collections.emptyList;
+import static
org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.DFLT_PORT_RANGE;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+
/** */
public class CdcPushMetricsExporterTest extends AbstractCdcTest {
/** */
private final AtomicBoolean metricsExported = new AtomicBoolean(false);
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+ /** */
+ private ListeningTestLogger lsnrLog;
+
+ /** */
+ private IgniteConfiguration getSrcClusterCfg(String igniteInstanceName)
throws Exception {
+ IgniteConfiguration cfg = getConfiguration(igniteInstanceName);
cfg.setDataStorageConfiguration(new DataStorageConfiguration()
.setWalForceArchiveTimeout(WAL_ARCHIVE_TIMEOUT)
.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
- .setCdcEnabled(true)));
+ .setCdcEnabled(true)))
+ .setBinaryConfiguration(new
BinaryConfiguration().setCompactFooter(true));
+
+ return cfg;
+ }
+
+ /** */
+ private IgniteConfiguration getDestClusterCfg(String igniteInstanceName)
throws Exception {
+ IgniteConfiguration cfg = getConfiguration(igniteInstanceName);
+
+ cfg.setGridLogger(lsnrLog)
+ .setBinaryConfiguration(new
BinaryConfiguration().setCompactFooter(true))
+ .setDiscoverySpi(new TcpDiscoverySpi()
+ .setLocalPort(TcpDiscoverySpi.DFLT_PORT + DFLT_PORT_RANGE)
+ .setLocalAddress("127.0.0.1")
+ .setIpFinder(new TcpDiscoveryVmIpFinder()
+ .setAddresses(Collections.singleton("127.0.0.1:" +
(TcpDiscoverySpi.DFLT_PORT + DFLT_PORT_RANGE) +
+ ".." + (TcpDiscoverySpi.DFLT_PORT + DFLT_PORT_RANGE +
3)))
+ .setShared(true)));
return cfg;
}
@@ -61,10 +98,15 @@ public class CdcPushMetricsExporterTest extends
AbstractCdcTest {
};
}
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ lsnrLog = new ListeningTestLogger(log);
+ }
+
/** Test checks that metrics are exported via exporter based on the push
metrics exporter adapter. */
@Test
public void testPushMetricsExporter() throws Exception {
- IgniteConfiguration cfg = getConfiguration("ignite-0");
+ IgniteConfiguration cfg = getSrcClusterCfg("ignite-0");
Ignite ign = startGrid(cfg);
@@ -84,9 +126,83 @@ public class CdcPushMetricsExporterTest extends
AbstractCdcTest {
assertTrue(metricsExported.get());
}
+ /** Test checks that consumer can start ignite client node and connect to
destination cluster. */
+ @Test
+ public void testIgniteToIgniteConsumer() throws Exception {
+ IgniteConfiguration srcClusterCfg = getSrcClusterCfg("src-cluster");
+ startGrid(srcClusterCfg);
+
+ final CountDownLatch destClusterStarted = new CountDownLatch(1);
+
+ lsnrLog.registerListener(new CallbackExecutorLogListener(".*Ignite
node started OK \\(id=.*, instance name=dest-cluster\\)",
+ destClusterStarted::countDown));
+
+ IgniteConfiguration destClusterCfg = getDestClusterCfg("dest-cluster");
+
+ IgniteProcessProxy targetIgn = startRemoteDestCluster(destClusterCfg);
+
+ destClusterStarted.await(30, TimeUnit.SECONDS);
+
+ final CountDownLatch cdcClientNodeJoined = new CountDownLatch(1);
+
+ lsnrLog.registerListener(new
CallbackExecutorLogListener(".*TestIgniteToIgniteConsumer started.",
+ cdcClientNodeJoined::countDown));
+
+ CdcConfiguration cdcCfg = new CdcConfiguration();
+
+ IgniteConfiguration destClusterCliCfg = getDestClusterCfg("cdc-client")
+ .setClientMode(true);
+
+ cdcCfg.setConsumer(new TestIgniteToIgniteConsumer(destClusterCliCfg));
+
+ CdcMain cdc = new CdcMain(srcClusterCfg, null, cdcCfg);
+
+ runAsync(cdc);
+
+ assertTrue(cdcClientNodeJoined.await(30, TimeUnit.SECONDS));
+
+ cdc.stop();
+
+ targetIgn.kill();
+ }
+
+ /**
+ * @param destClusterCfg destination cluster config.
+ * @return Ignite proxy for destination cluster started in another JVM.
+ */
+ private IgniteProcessProxy startRemoteDestCluster(IgniteConfiguration
destClusterCfg) throws Exception {
+ return new IgniteProcessProxy(
+ destClusterCfg,
+ destClusterCfg.getGridLogger(),
+ null,
+ false,
+ emptyList()
+ );
+ }
+
/** */
public static void addData(IgniteCache<Integer, User> cache, int from, int
to) {
for (int i = from; i < to; i++)
cache.put(i, createUser(i));
}
+
+ /** Test CDC consumer invoking the ignite client node. */
+ public static class TestIgniteToIgniteConsumer extends UserCdcConsumer {
+ /** */
+ private final IgniteConfiguration destClusterCliCfg;
+
+ /** */
+ public TestIgniteToIgniteConsumer(IgniteConfiguration
destClusterCliCfg) {
+ this.destClusterCliCfg = destClusterCliCfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start(MetricRegistry mreg) {
+ Ignite ignite = Ignition.start(destClusterCliCfg);
+
+ super.start(mreg);
+
+ ignite.log().info("TestIgniteToIgniteConsumer started.");
+ }
+ }
}