This is an automated email from the ASF dual-hosted git repository.
timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new ffc97cddb60 IGNITE-26264 Move discoveryInternalLsnr to tests (#12287)
ffc97cddb60 is described below
commit ffc97cddb6073f88b34a9ab35ac0fc54a3a8ef9f
Author: Maksim Timonin <[email protected]>
AuthorDate: Wed Aug 27 19:04:45 2025 +0500
IGNITE-26264 Move discoveryInternalLsnr to tests (#12287)
---
.../managers/discovery/IgniteDiscoverySpi.java | 7 ----
.../discovery/isolated/IsolatedDiscoverySpi.java | 6 ----
.../ignite/spi/discovery/tcp/TcpDiscoverySpi.java | 26 ---------------
.../IgniteClientReconnectAbstractTest.java | 35 ++-----------------
.../IgniteClientReconnectApiExceptionTest.java | 4 +--
.../internal/IgniteClientReconnectCacheTest.java | 6 ++--
.../internal/IgniteClientReconnectStopTest.java | 4 +--
.../IgniteDiscoverySpiInternalListener.java | 0
.../cache/IgniteClusterActivateDeactivateTest.java | 3 +-
.../CacheLateAffinityAssignmentTest.java | 9 ++---
.../ignite/messaging/GridMessagingSelfTest.java | 8 +++--
.../discovery/DiscoverySpiDataExchangeTest.java | 6 ----
.../spi/discovery/tcp/BlockTcpDiscoverySpi.java | 2 +-
.../IgniteDiscoverySpiInternalListenerSupport.java | 30 +++++++++++++++++
...scoveryNodeAttributesUpdateOnReconnectTest.java | 3 +-
.../TcpDiscoveryReconnectUnstableTopologyTest.java | 4 +--
.../spi/discovery/tcp/TestTcpDiscoverySpi.java | 39 ++++++++++++++++++++--
.../DynamicColumnsAbstractConcurrentSelfTest.java | 3 +-
.../DynamicIndexAbstractConcurrentSelfTest.java | 3 +-
.../persistence/db/wal/IgniteWalRecoveryTest.java | 4 +--
.../spi/discovery/zk/ZookeeperDiscoverySpi.java | 30 +++++------------
.../zk/internal/ZookeeperDiscoveryImpl.java | 14 +-------
.../zk/ZookeeperDiscoverySpiTestConfigurator.java | 35 ++++++++++++++++++-
.../zk/internal/ZookeeperDiscoverySpiTestBase.java | 38 +++++++++------------
24 files changed, 155 insertions(+), 164 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java
index 9aa5d140bba..54cc67ba987 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpi.java
@@ -47,13 +47,6 @@ public interface IgniteDiscoverySpi extends DiscoverySpi {
*/
public void simulateNodeFailure();
- /**
- * For TESTING only.
- *
- * @param lsnr Listener.
- */
- public void setInternalListener(IgniteDiscoverySpiInternalListener lsnr);
-
/**
* @return {@code True} if supports communication error resolve.
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/isolated/IsolatedDiscoverySpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/isolated/IsolatedDiscoverySpi.java
index 0890729dfdc..1e0d02019fb 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/isolated/IsolatedDiscoverySpi.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/isolated/IsolatedDiscoverySpi.java
@@ -31,7 +31,6 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
-import
org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.marshaller.Marshaller;
@@ -268,11 +267,6 @@ public class IsolatedDiscoverySpi extends IgniteSpiAdapter
implements IgniteDisc
// No-op.
}
- /** {@inheritDoc} */
- @Override public void
setInternalListener(IgniteDiscoverySpiInternalListener lsnr) {
- // No-op.
- }
-
/** {@inheritDoc} */
@Override public boolean supportsCommunicationFailureResolve() {
return false;
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 97d4c7e0a55..a3e4d2ae2b7 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -59,7 +59,6 @@ import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
-import
org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener;
import org.apache.ignite.internal.processors.failure.FailureProcessor;
import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -107,10 +106,8 @@ import
org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAuthFailedMessage;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage;
-import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientReconnectMessage;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryDuplicateIdMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryEnsureDelivery;
-import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryJoinRequestMessage;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
@@ -452,9 +449,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter
implements IgniteDiscovery
/** */
protected IgniteSpiContext spiCtx;
- /** */
- private IgniteDiscoverySpiInternalListener internalLsnr;
-
/** For test purposes. */
private boolean skipAddrsRandomization = false;
@@ -527,13 +521,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter
implements IgniteDiscovery
/** {@inheritDoc} */
@Override public void sendCustomEvent(DiscoverySpiCustomMessage msg)
throws IgniteException {
- IgniteDiscoverySpiInternalListener internalLsnr = this.internalLsnr;
-
- if (internalLsnr != null) {
- if (!internalLsnr.beforeSendCustomEvent(this, log, msg))
- return;
- }
-
impl.sendCustomEvent(msg);
}
@@ -1776,14 +1763,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter
implements IgniteDiscovery
OutputStream out,
TcpDiscoveryAbstractMessage msg,
long timeout) throws IOException, IgniteCheckedException {
- if (internalLsnr != null) {
- if (msg instanceof TcpDiscoveryJoinRequestMessage)
- internalLsnr.beforeJoin(locNode, log);
-
- if (msg instanceof TcpDiscoveryClientReconnectMessage)
- internalLsnr.beforeReconnect(locNode, log);
- }
-
assert sock != null;
assert msg != null;
assert out != null;
@@ -2436,11 +2415,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter
implements IgniteDiscovery
sndMsgLsnrs.add(lsnr);
}
- /** {@inheritDoc} */
- @Override public void
setInternalListener(IgniteDiscoverySpiInternalListener lsnr) {
- this.internalLsnr = lsnr;
- }
-
/**
* <strong>FOR TEST ONLY!!!</strong>
*
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
index a074f211560..347a48a280f 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
@@ -17,9 +17,6 @@
package org.apache.ignite.internal;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.Socket;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -30,7 +27,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.cache.CacheException;
import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
@@ -49,10 +45,9 @@ import
org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import
org.apache.ignite.spi.discovery.tcp.IgniteDiscoverySpiInternalListenerSupport;
+import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
-import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
-import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryJoinRequestMessage;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
@@ -268,7 +263,7 @@ public abstract class IgniteClientReconnectAbstractTest
extends GridCommonAbstra
blockLsnrs.add(lsnr);
- spi0(client).setInternalListener(lsnr);
+
((IgniteDiscoverySpiInternalListenerSupport)spi0(client)).setInternalListener(lsnr);
}
IgnitePredicate<Event> p = new IgnitePredicate<Event>() {
@@ -398,30 +393,6 @@ public abstract class IgniteClientReconnectAbstractTest
extends GridCommonAbstra
e.reconnectFuture().get();
}
- /**
- *
- */
- public static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
- /** */
- volatile CountDownLatch writeLatch;
-
- /** {@inheritDoc} */
- @Override protected void writeToSocket(Socket sock, OutputStream out,
TcpDiscoveryAbstractMessage msg, long timeout)
- throws IOException, IgniteCheckedException {
- if (msg instanceof TcpDiscoveryJoinRequestMessage) {
- CountDownLatch writeLatch0 = writeLatch;
-
- if (writeLatch0 != null) {
- log.info("Block join request send: " + msg);
-
- U.await(writeLatch0);
- }
- }
-
- super.writeToSocket(sock, out, msg, timeout);
- }
- }
-
/**
*
*/
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java
index e361b224344..d44ded4a55b 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java
@@ -44,7 +44,6 @@ import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.CollectionConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.Event;
-import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -53,6 +52,7 @@ import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.spi.discovery.DiscoverySpi;
+import
org.apache.ignite.spi.discovery.tcp.IgniteDiscoverySpiInternalListenerSupport;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Test;
@@ -764,7 +764,7 @@ public class IgniteClientReconnectApiExceptionTest extends
IgniteClientReconnect
throws Exception {
assertNotNull(client.cache(DEFAULT_CACHE_NAME));
- final IgniteDiscoverySpi clientSpi = spi0(client);
+ final IgniteDiscoverySpiInternalListenerSupport clientSpi =
(IgniteDiscoverySpiInternalListenerSupport)spi0(client);
Ignite srv = clientRouter(client);
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
index b6a5ecf4754..73aaed4cd89 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
@@ -48,7 +48,6 @@ import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
-import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import
org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
@@ -68,6 +67,7 @@ import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.DiscoverySpi;
+import
org.apache.ignite.spi.discovery.tcp.IgniteDiscoverySpiInternalListenerSupport;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.transactions.Transaction;
@@ -157,7 +157,7 @@ public class IgniteClientReconnectCacheTest extends
IgniteClientReconnectAbstrac
public void testReconnect() throws Exception {
IgniteEx client = startClientGrid(SRV_CNT);
- final IgniteDiscoverySpi clientSpi = spi0(client);
+ final IgniteDiscoverySpiInternalListenerSupport clientSpi =
(IgniteDiscoverySpiInternalListenerSupport)spi0(client);
Ignite srv = ignite(0);
@@ -461,7 +461,7 @@ public class IgniteClientReconnectCacheTest extends
IgniteClientReconnectAbstrac
throws Exception {
Ignite srv = ignite(0);
- final IgniteDiscoverySpi clientSpi = spi0(client);
+ final IgniteDiscoverySpiInternalListenerSupport clientSpi =
(IgniteDiscoverySpiInternalListenerSupport)spi0(client);
final DiscoverySpi srvSpi = spi0(srv);
final CountDownLatch disconnectLatch = new CountDownLatch(1);
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStopTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStopTest.java
index 3b8311a2e5f..33e9b5d19a2 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStopTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStopTest.java
@@ -23,10 +23,10 @@ import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.events.Event;
-import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.spi.discovery.DiscoverySpi;
+import
org.apache.ignite.spi.discovery.tcp.IgniteDiscoverySpiInternalListenerSupport;
import org.junit.Test;
import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED;
@@ -56,7 +56,7 @@ public class IgniteClientReconnectStopTest extends
IgniteClientReconnectAbstract
final CountDownLatch disconnectLatch = new CountDownLatch(1);
final CountDownLatch reconnectLatch = new CountDownLatch(1);
- final IgniteDiscoverySpi clientSpi = spi0(client);
+ final IgniteDiscoverySpiInternalListenerSupport clientSpi =
(IgniteDiscoverySpiInternalListenerSupport)spi0(client);
DiscoverySpiTestListener lsnr = new DiscoverySpiTestListener();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java
b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java
similarity index 100%
rename from
modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java
rename to
modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoverySpiInternalListener.java
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java
index 9cd45a97a21..4d6535b7ce6 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java
@@ -58,6 +58,7 @@ import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.mxbean.IgniteMXBean;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
@@ -109,7 +110,7 @@ public class IgniteClusterActivateDeactivateTest extends
GridCommonAbstractTest
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
if (testReconnectSpi) {
- TcpDiscoverySpi spi = new
IgniteClientReconnectAbstractTest.TestTcpDiscoverySpi();
+ TcpDiscoverySpi spi = new TestTcpDiscoverySpi();
cfg.setDiscoverySpi(spi);
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
index d559b82ce25..21e00d98d5a 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
@@ -61,7 +61,6 @@ import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import
org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
import org.apache.ignite.internal.cluster.NodeOrderComparator;
-import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import
org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl;
import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
@@ -90,7 +89,9 @@ import
org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.services.Service;
import org.apache.ignite.services.ServiceContext;
+import
org.apache.ignite.spi.discovery.tcp.IgniteDiscoverySpiInternalListenerSupport;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
@@ -1397,7 +1398,7 @@ public class CacheLateAffinityAssignmentTest extends
GridCommonAbstractTest {
DiscoverySpiTestListener lsnr = new DiscoverySpiTestListener();
-
((IgniteDiscoverySpi)ignite0.configuration().getDiscoverySpi()).setInternalListener(lsnr);
+
((IgniteDiscoverySpiInternalListenerSupport)ignite0.configuration().getDiscoverySpi()).setInternalListener(lsnr);
TestRecordingCommunicationSpi commSpi0 =
(TestRecordingCommunicationSpi)ignite0.configuration().getCommunicationSpi();
@@ -1495,7 +1496,7 @@ public class CacheLateAffinityAssignmentTest extends
GridCommonAbstractTest {
DiscoverySpiTestListener lsnr = new DiscoverySpiTestListener();
-
((IgniteDiscoverySpi)ignite0.configuration().getDiscoverySpi()).setInternalListener(lsnr);
+
((TestTcpDiscoverySpi)ignite0.configuration().getDiscoverySpi()).setInternalListener(lsnr);
TestRecordingCommunicationSpi commSpi0 =
(TestRecordingCommunicationSpi)ignite0.configuration().getCommunicationSpi();
@@ -1548,7 +1549,7 @@ public class CacheLateAffinityAssignmentTest extends
GridCommonAbstractTest {
DiscoverySpiTestListener lsnr = new DiscoverySpiTestListener();
-
((IgniteDiscoverySpi)ignite0.configuration().getDiscoverySpi()).setInternalListener(lsnr);
+
((TestTcpDiscoverySpi)ignite0.configuration().getDiscoverySpi()).setInternalListener(lsnr);
TestRecordingCommunicationSpi spi =
(TestRecordingCommunicationSpi)ignite0.configuration().getCommunicationSpi();
diff --git
a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
index 96f5ddd253b..6a7760ec558 100644
---
a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
@@ -39,7 +39,6 @@ import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteMessaging;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.internal.DiscoverySpiTestListener;
-import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import
org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage;
import
org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessageV2;
import
org.apache.ignite.internal.processors.continuous.StopRoutineDiscoveryMessage;
@@ -49,6 +48,7 @@ import org.apache.ignite.internal.util.typedef.PA;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.resources.IgniteInstanceResource;
+import
org.apache.ignite.spi.discovery.tcp.IgniteDiscoverySpiInternalListenerSupport;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.config.GridTestProperties;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -1028,7 +1028,8 @@ public class GridMessagingSelfTest extends
GridCommonAbstractTest implements Ser
public void testAsyncOld() throws Exception {
final AtomicInteger msgCnt = new AtomicInteger();
- IgniteDiscoverySpi discoSpi =
(IgniteDiscoverySpi)ignite2.configuration().getDiscoverySpi();
+ IgniteDiscoverySpiInternalListenerSupport discoSpi =
+
(IgniteDiscoverySpiInternalListenerSupport)ignite2.configuration().getDiscoverySpi();
DiscoverySpiTestListener lsnr = new DiscoverySpiTestListener();
@@ -1141,7 +1142,8 @@ public class GridMessagingSelfTest extends
GridCommonAbstractTest implements Ser
public void testAsync() throws Exception {
final AtomicInteger msgCnt = new AtomicInteger();
- IgniteDiscoverySpi discoSpi =
(IgniteDiscoverySpi)ignite2.configuration().getDiscoverySpi();
+ IgniteDiscoverySpiInternalListenerSupport discoSpi =
+
(IgniteDiscoverySpiInternalListenerSupport)ignite2.configuration().getDiscoverySpi();
DiscoverySpiTestListener lsnr = new DiscoverySpiTestListener();
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchangeTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchangeTest.java
index 6b7e213c54d..30a7ec80298 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchangeTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchangeTest.java
@@ -30,7 +30,6 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
-import
org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.spi.IgniteSpiAdapter;
@@ -217,11 +216,6 @@ public class DiscoverySpiDataExchangeTest extends
GridCommonAbstractTest {
delegate.simulateNodeFailure();
}
- /** {@inheritDoc} */
- @Override public void
setInternalListener(IgniteDiscoverySpiInternalListener lsnr) {
- delegate.setInternalListener(lsnr);
- }
-
/** {@inheritDoc} */
@Override public boolean supportsCommunicationFailureResolve() {
return delegate.supportsCommunicationFailureResolve();
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/BlockTcpDiscoverySpi.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/BlockTcpDiscoverySpi.java
index 23b88a6d8e6..65688863a84 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/BlockTcpDiscoverySpi.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/BlockTcpDiscoverySpi.java
@@ -35,7 +35,7 @@ import static org.junit.Assert.assertNotNull;
/**
* Custom discovery SPI allowing to block custom messages transfer between
nodes.
*/
-public class BlockTcpDiscoverySpi extends TcpDiscoverySpi {
+public class BlockTcpDiscoverySpi extends TestTcpDiscoverySpi {
/** Closure. */
private volatile IgniteBiClosure<ClusterNode, DiscoveryCustomMessage,
Void> clo;
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteDiscoverySpiInternalListenerSupport.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteDiscoverySpiInternalListenerSupport.java
new file mode 100644
index 00000000000..ccd6ec1b61d
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteDiscoverySpiInternalListenerSupport.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
+import
org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener;
+
+/**
+ * Class provides support for setting {@link
IgniteDiscoverySpiInternalListener} for different {@link IgniteDiscoverySpi}
+ * implementations.
+ */
+public interface IgniteDiscoverySpiInternalListenerSupport {
+ /** */
+ public void setInternalListener(IgniteDiscoverySpiInternalListener lsnr);
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNodeAttributesUpdateOnReconnectTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNodeAttributesUpdateOnReconnectTest.java
index 55319b9f90d..a9c0ac85d07 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNodeAttributesUpdateOnReconnectTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryNodeAttributesUpdateOnReconnectTest.java
@@ -27,7 +27,6 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
-import org.apache.ignite.internal.IgniteClientReconnectAbstractTest;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.spi.discovery.TestReconnectSecurityPluginProvider;
@@ -60,7 +59,7 @@ public class TcpDiscoveryNodeAttributesUpdateOnReconnectTest
extends GridCommonA
cfg.setUserAttributes(attrs);
}
- IgniteClientReconnectAbstractTest.TestTcpDiscoverySpi spi = new
IgniteClientReconnectAbstractTest.TestTcpDiscoverySpi();
+ TestTcpDiscoverySpi spi = new TestTcpDiscoverySpi();
TcpDiscoveryIpFinder finder =
((TcpDiscoverySpi)cfg.getDiscoverySpi()).getIpFinder();
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryReconnectUnstableTopologyTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryReconnectUnstableTopologyTest.java
index 474ef6c0346..d487e0dfb01 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryReconnectUnstableTopologyTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryReconnectUnstableTopologyTest.java
@@ -133,7 +133,7 @@ public class TcpDiscoveryReconnectUnstableTopologyTest
extends GridCommonAbstrac
/**
* @param ig Ignite.
*/
- private TcpDiscoverySpi spi(Ignite ig) {
- return (TcpDiscoverySpi)ig.configuration().getDiscoverySpi();
+ private TestTcpDiscoverySpi spi(Ignite ig) {
+ return (TestTcpDiscoverySpi)ig.configuration().getDiscoverySpi();
}
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
index 00ae982261c..669b3112fd8 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TestTcpDiscoverySpi.java
@@ -21,8 +21,13 @@ import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import
org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.DiscoverySpiListener;
import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientReconnectMessage;
+import
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryJoinRequestMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse;
import org.apache.ignite.testframework.GridTestUtils.DiscoveryHook;
import org.jetbrains.annotations.Nullable;
@@ -32,20 +37,31 @@ import static
org.apache.ignite.testframework.GridTestUtils.DiscoverySpiListener
/**
*
*/
-public class TestTcpDiscoverySpi extends TcpDiscoverySpi {
+public class TestTcpDiscoverySpi extends TcpDiscoverySpi implements
IgniteDiscoverySpiInternalListenerSupport {
/** */
public boolean ignorePingResponse;
/** Interceptor of discovery messages. */
private DiscoveryHook discoHook;
+ /** */
+ private IgniteDiscoverySpiInternalListener internalLsnr;
+
/** {@inheritDoc} */
@Override protected void writeToSocket(Socket sock, OutputStream out,
TcpDiscoveryAbstractMessage msg, long timeout) throws IOException,
IgniteCheckedException {
if (msg instanceof TcpDiscoveryPingResponse && ignorePingResponse)
return;
- else
- super.writeToSocket(sock, out, msg, timeout);
+
+ if (internalLsnr != null) {
+ if (msg instanceof TcpDiscoveryJoinRequestMessage)
+ internalLsnr.beforeJoin(locNode, log);
+
+ if (msg instanceof TcpDiscoveryClientReconnectMessage)
+ internalLsnr.beforeReconnect(locNode, log);
+ }
+
+ super.writeToSocket(sock, out, msg, timeout);
}
/** {@inheritDoc} */
@@ -58,6 +74,23 @@ public class TestTcpDiscoverySpi extends TcpDiscoverySpi {
super.setListener(lsnr == null || discoHook == null ? lsnr :
wrap(lsnr, discoHook));
}
+ /** {@inheritDoc} */
+ @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg)
throws IgniteException {
+ IgniteDiscoverySpiInternalListener internalLsnr = this.internalLsnr;
+
+ if (internalLsnr != null) {
+ if (!internalLsnr.beforeSendCustomEvent(this, log, msg))
+ return;
+ }
+
+ super.sendCustomEvent(msg);
+ }
+
+ /** */
+ @Override public void
setInternalListener(IgniteDiscoverySpiInternalListener lsnr) {
+ internalLsnr = lsnr;
+ }
+
/**
* Sets interceptor of discovery messages. Note that {@link DiscoveryHook}
must be set before SPI start.
* Otherwise, this method call will take no effect.
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java
index 352f1dfa4ed..518a7992a2d 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java
@@ -67,11 +67,10 @@ import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Test;
-import static
org.apache.ignite.internal.IgniteClientReconnectAbstractTest.TestTcpDiscoverySpi;
-
/**
* Concurrency tests for dynamic index create/drop.
*/
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractConcurrentSelfTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractConcurrentSelfTest.java
index a576b769970..00fe5dffedc 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractConcurrentSelfTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractConcurrentSelfTest.java
@@ -60,12 +60,11 @@ import org.apache.ignite.internal.util.lang.RunnableX;
import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
-import static
org.apache.ignite.internal.IgniteClientReconnectAbstractTest.TestTcpDiscoverySpi;
-
/**
* Concurrency tests for dynamic index create/drop.
*/
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
index d2d39a50e11..c200d7d7652 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
@@ -64,7 +64,6 @@ import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
-import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.internal.metric.IoStatisticsHolderNoOp;
import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.pagemem.PageUtils;
@@ -118,6 +117,7 @@ import org.apache.ignite.lifecycle.LifecycleEventType;
import org.apache.ignite.loadtests.colocation.GridTestLifecycleBean;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.resources.IgniteInstanceResource;
+import
org.apache.ignite.spi.discovery.tcp.IgniteDiscoverySpiInternalListenerSupport;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.GridTestUtils.SF;
import org.apache.ignite.testframework.junits.WithSystemProperty;
@@ -597,7 +597,7 @@ public class IgniteWalRecoveryTest extends
GridCommonAbstractTest {
final IgniteConfiguration onJoinCfg =
optimize(getConfiguration(ig2Name));
// Check restore beeing called before PME and joining node to cluster.
- ((IgniteDiscoverySpi)onJoinCfg.getDiscoverySpi())
+
((IgniteDiscoverySpiInternalListenerSupport)onJoinCfg.getDiscoverySpi())
.setInternalListener(new DiscoverySpiTestListener() {
@Override public void beforeJoin(ClusterNode locNode,
IgniteLogger log) {
String nodeName =
locNode.attribute(ATTR_IGNITE_INSTANCE_NAME);
diff --git
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
index d7a6e75e7a7..be157c62fce 100644
---
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
+++
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpi.java
@@ -30,7 +30,6 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
-import
org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener;
import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -136,10 +135,7 @@ public class ZookeeperDiscoverySpi extends
IgniteSpiAdapter implements IgniteDis
/** */
@LoggerResource
@GridToStringExclude
- private IgniteLogger log;
-
- /** */
- private IgniteDiscoverySpiInternalListener internalLsnr;
+ protected IgniteLogger log;
/** */
private final ZookeeperDiscoveryStatistics stats = new
ZookeeperDiscoveryStatistics();
@@ -408,13 +404,6 @@ public class ZookeeperDiscoverySpi extends
IgniteSpiAdapter implements IgniteDis
/** {@inheritDoc} */
@Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) {
- IgniteDiscoverySpiInternalListener internalLsnr = impl.internalLsnr;
-
- if (internalLsnr != null) {
- if (!internalLsnr.beforeSendCustomEvent(this, log, msg))
- return;
- }
-
impl.sendCustomMessage(msg);
}
@@ -469,7 +458,6 @@ public class ZookeeperDiscoverySpi extends IgniteSpiAdapter
implements IgniteDis
locNode,
lsnr,
exchange,
- internalLsnr,
stats,
((IgniteEx)ignite).context().marshallerContext().jdkMarshaller());
@@ -496,14 +484,6 @@ public class ZookeeperDiscoverySpi extends
IgniteSpiAdapter implements IgniteDis
discoReg.register("Coordinator", () -> impl.getCoordinator(),
UUID.class, "Coordinator ID");
}
- /** {@inheritDoc} */
- @Override public void
setInternalListener(IgniteDiscoverySpiInternalListener lsnr) {
- if (impl != null)
- impl.internalLsnr = lsnr;
- else
- internalLsnr = lsnr;
- }
-
/** {@inheritDoc} */
@Override public void simulateNodeFailure() {
impl.simulateNodeFailure();
@@ -524,6 +504,14 @@ public class ZookeeperDiscoverySpi extends
IgniteSpiAdapter implements IgniteDis
return locNodeAttrs;
}
+ /**
+ * Callback before join topology.
+ * @param locNode Local node.
+ */
+ public void beforeJoinTopology(ClusterNode locNode) {
+ // No-op.
+ }
+
/**
* @return Local node instance.
*/
diff --git
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 581d7132bd1..37039dd028b 100644
---
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -62,7 +62,6 @@ import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.IgnitionEx;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
-import
org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener;
import org.apache.ignite.internal.processors.security.SecurityContext;
import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.GridSpinBusyLock;
@@ -205,9 +204,6 @@ public class ZookeeperDiscoveryImpl {
/** */
private final Object stateMux = new Object();
- /** */
- public volatile IgniteDiscoverySpiInternalListener internalLsnr;
-
/** */
private final ConcurrentHashMap<Long, PingFuture> pingFuts = new
ConcurrentHashMap<>();
@@ -228,7 +224,6 @@ public class ZookeeperDiscoveryImpl {
* @param locNode Local node instance.
* @param lsnr Discovery events listener.
* @param exchange Discovery data exchange.
- * @param internalLsnr Internal listener (used for testing only).
* @param stats Zookeeper DiscoverySpi statistics collector.
* @param marsh Marshaller.
*/
@@ -240,7 +235,6 @@ public class ZookeeperDiscoveryImpl {
ZookeeperClusterNode locNode,
DiscoverySpiListener lsnr,
DiscoverySpiDataExchange exchange,
- IgniteDiscoverySpiInternalListener internalLsnr,
ZookeeperDiscoveryStatistics stats,
JdkMarshaller marsh
) {
@@ -267,9 +261,6 @@ public class ZookeeperDiscoveryImpl {
this.evtsAckThreshold = evtsAckThreshold;
- if (internalLsnr != null)
- this.internalLsnr = internalLsnr;
-
this.stats = stats;
}
@@ -792,10 +783,7 @@ public class ZookeeperDiscoveryImpl {
// Need fire EVT_CLIENT_NODE_RECONNECTED event if reconnect after
already joined.
boolean reconnect = locNode.isClient() && prevState != null &&
(prevState.joined || prevState.reconnect);
- IgniteDiscoverySpiInternalListener internalLsnr =
this.internalLsnr;
-
- if (internalLsnr != null)
- internalLsnr.beforeJoin(locNode, log);
+ spi.beforeJoinTopology(locNode);
if (reconnect)
locNode.setAttributes(spi.getLocNodeAttrs());
diff --git
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestConfigurator.java
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestConfigurator.java
index 541d6aaaf4f..8a2f4e6c84c 100644
---
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestConfigurator.java
+++
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/ZookeeperDiscoverySpiTestConfigurator.java
@@ -20,8 +20,12 @@ package org.apache.ignite.spi.discovery.zk;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.curator.test.TestingCluster;
+import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
+import
org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener;
import org.apache.ignite.spi.discovery.DiscoverySpi;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import
org.apache.ignite.spi.discovery.tcp.IgniteDiscoverySpiInternalListenerSupport;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.testframework.config.GridTestProperties;
@@ -69,7 +73,7 @@ public class ZookeeperDiscoverySpiTestConfigurator {
if (testingCluster == null)
throw new IllegalStateException("Test Zookeeper cluster is not
started.");
- ZookeeperDiscoverySpi zkSpi = new ZookeeperDiscoverySpi();
+ ZookeeperDiscoverySpi zkSpi = new TestZookeeperDiscoverySpi();
DiscoverySpi spi = cfg.getDiscoverySpi();
@@ -85,4 +89,33 @@ public class ZookeeperDiscoverySpiTestConfigurator {
lock.unlock();
}
}
+
+ /** */
+ private static class TestZookeeperDiscoverySpi extends
ZookeeperDiscoverySpi implements IgniteDiscoverySpiInternalListenerSupport {
+ /** */
+ private volatile IgniteDiscoverySpiInternalListener internalLsnr;
+
+ /** {@inheritDoc} */
+ @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) {
+ IgniteDiscoverySpiInternalListener internalLsnr =
this.internalLsnr;
+
+ if (internalLsnr != null &&
!internalLsnr.beforeSendCustomEvent(this, log, msg))
+ return;
+
+ super.sendCustomEvent(msg);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void beforeJoinTopology(ClusterNode locNode) {
+ IgniteDiscoverySpiInternalListener internalLsnr =
this.internalLsnr;
+
+ if (internalLsnr != null)
+ internalLsnr.beforeJoin(locNode, log);
+ }
+
+ /** */
+ @Override public void
setInternalListener(IgniteDiscoverySpiInternalListener lsnr) {
+ internalLsnr = lsnr;
+ }
+ }
}
diff --git
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTestBase.java
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTestBase.java
index fbcd959fe63..0f65306eac6 100644
---
a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTestBase.java
+++
b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTestBase.java
@@ -41,7 +41,6 @@ import org.apache.curator.test.TestingCluster;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
@@ -58,7 +57,6 @@ import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.IgnitionEx;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData;
-import
org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener;
import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
@@ -73,10 +71,9 @@ import
org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.security.SecurityCredentials;
import org.apache.ignite.plugin.segmentation.SegmentationPolicy;
import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.communication.tcp.internal.GridNioServerWrapper;
-import org.apache.ignite.spi.discovery.DiscoverySpi;
-import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpi;
import org.apache.ignite.spi.discovery.zk.ZookeeperDiscoverySpiTestUtil;
@@ -379,7 +376,7 @@ class ZookeeperDiscoverySpiTestBase extends
GridCommonAbstractTest {
if (!dfltConsistenId)
cfg.setConsistentId(igniteInstanceName);
- ZookeeperDiscoverySpi zkSpi = new ZookeeperDiscoverySpi();
+ ZookeeperDiscoverySpi zkSpi = auth != null ? new
TestAuthZookeeperDiscoverySpi() : new ZookeeperDiscoverySpi();
if (joinTimeout != 0)
zkSpi.setJoinTimeout(joinTimeout);
@@ -389,26 +386,9 @@ class ZookeeperDiscoverySpiTestBase extends
GridCommonAbstractTest {
zkSpi.setClientReconnectDisabled(clientReconnectDisabled);
// Set authenticator for basic sanity tests.
- if (auth != null) {
+ if (auth != null)
zkSpi.setAuthenticator(auth.apply());
- zkSpi.setInternalListener(new IgniteDiscoverySpiInternalListener()
{
- @Override public void beforeJoin(ClusterNode locNode,
IgniteLogger log) {
- ZookeeperClusterNode locNode0 =
(ZookeeperClusterNode)locNode;
-
- Map<String, Object> attrs = new
HashMap<>(locNode0.getAttributes());
-
- attrs.put(ATTR_SECURITY_CREDENTIALS, new
SecurityCredentials(null, null, igniteInstanceName));
-
- locNode0.setAttributes(attrs);
- }
-
- @Override public boolean beforeSendCustomEvent(DiscoverySpi
spi, IgniteLogger log, DiscoverySpiCustomMessage msg) {
- return false;
- }
- });
- }
-
spis.put(igniteInstanceName, zkSpi);
if (USE_TEST_CLUSTER) {
@@ -940,4 +920,16 @@ class ZookeeperDiscoverySpiTestBase extends
GridCommonAbstractTest {
return false;
}
}
+
+ /** */
+ private static class TestAuthZookeeperDiscoverySpi extends
ZookeeperDiscoverySpi {
+ /** */
+ @Override public void spiStart(@Nullable String igniteInstanceName)
throws IgniteSpiException {
+ ((IgniteEx)ignite).context().addNodeAttribute(
+ ATTR_SECURITY_CREDENTIALS,
+ new SecurityCredentials(null, null, igniteInstanceName));
+
+ super.spiStart(igniteInstanceName);
+ }
+ }
}