This is an automated email from the ASF dual-hosted git repository.
namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 8eda6173fb8 IGNITE-28052 Use MessageSerializer for InitMessage (#12912)
8eda6173fb8 is described below
commit 8eda6173fb806f31372d2cd152fce9876dc3e964
Author: Nikita Amelchev <[email protected]>
AuthorDate: Fri Mar 20 22:52:46 2026 +0300
IGNITE-28052 Use MessageSerializer for InitMessage (#12912)
---
.../discovery/DiscoveryMessageFactory.java | 24 +++++
.../encryption/ChangeCacheEncryptionRequest.java | 35 +++++--
.../managers/encryption/GridEncryptionManager.java | 71 --------------
.../encryption/MasterKeyChangeRequest.java | 107 +++++++++++++++++++++
.../snapshot/AbstractSnapshotOperationRequest.java | 36 ++++---
.../snapshot/IgniteSnapshotManager.java | 35 -------
.../snapshot/SnapshotCheckProcessRequest.java | 24 +++--
.../snapshot/SnapshotOperationEndRequest.java | 33 +++++--
.../snapshot/SnapshotOperationRequest.java | 41 +++++---
.../snapshot/SnapshotRestoreProcess.java | 70 +++++++-------
.../snapshot/SnapshotRestoreStartRequest.java | 56 +++++++++++
.../snapshot/SnapshotStartDiscoveryMessage.java | 71 ++++++++++++++
.../PerformanceStatisticsProcessor.java | 3 +-
.../util/distributed/DistributedProcess.java | 3 +-
.../internal/util/distributed/InitMessage.java | 34 +++++--
.../encryption/CacheGroupKeyChangeTest.java | 4 +-
.../snapshot/IgniteSnapshotManagerSelfTest.java | 2 +-
.../DistributedProcessClientAwaitTest.java | 24 ++---
.../DistributedProcessCoordinatorLeftTest.java | 8 +-
.../DistributedProcessErrorHandlingTest.java | 6 +-
20 files changed, 460 insertions(+), 227 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
index 69800247c4b..e30853967bb 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
@@ -19,6 +19,10 @@ package org.apache.ignite.internal.managers.discovery;
import org.apache.ignite.internal.managers.communication.ErrorMessage;
import
org.apache.ignite.internal.managers.communication.ErrorMessageMarshallableSerializer;
+import
org.apache.ignite.internal.managers.encryption.ChangeCacheEncryptionRequest;
+import
org.apache.ignite.internal.managers.encryption.ChangeCacheEncryptionRequestSerializer;
+import org.apache.ignite.internal.managers.encryption.MasterKeyChangeRequest;
+import
org.apache.ignite.internal.managers.encryption.MasterKeyChangeRequestSerializer;
import org.apache.ignite.internal.processors.authentication.User;
import
org.apache.ignite.internal.processors.authentication.UserAcceptedMessage;
import
org.apache.ignite.internal.processors.authentication.UserAcceptedMessageSerializer;
@@ -55,18 +59,28 @@ import
org.apache.ignite.internal.processors.cache.persistence.snapshot.Snapshot
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotCheckHandlersResponseSerializer;
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotCheckPartitionHashesResponse;
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotCheckPartitionHashesResponseMarshallableSerializer;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotCheckProcessRequest;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotCheckProcessRequestSerializer;
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotCheckResponse;
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotCheckResponseSerializer;
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotHandlerResult;
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotHandlerResultSerializer;
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotMetadataResponse;
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotMetadataResponseMarshallableSerializer;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotOperationEndRequest;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotOperationEndRequestSerializer;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotOperationRequest;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotOperationRequestSerializer;
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotOperationResponse;
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotOperationResponseSerializer;
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotPartitionsVerifyHandlerResponse;
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotPartitionsVerifyHandlerResponseMarshallableSerializer;
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreOperationResponse;
import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreOperationResponseMarshallableSerializer;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreStartRequest;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreStartRequestSerializer;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotStartDiscoveryMessage;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotStartDiscoveryMessageSerializer;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import
org.apache.ignite.internal.processors.cache.version.GridCacheVersionSerializer;
import
org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
@@ -107,6 +121,8 @@ import
org.apache.ignite.internal.processors.query.schema.operation.SchemaIndexD
import
org.apache.ignite.internal.processors.query.schema.operation.SchemaIndexDropOperationSerializer;
import org.apache.ignite.internal.util.distributed.FullMessage;
import org.apache.ignite.internal.util.distributed.FullMessageSerializer;
+import org.apache.ignite.internal.util.distributed.InitMessage;
+import org.apache.ignite.internal.util.distributed.InitMessageSerializer;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
@@ -264,6 +280,14 @@ public class DiscoveryMessageFactory implements
MessageFactoryProvider {
factory.register((short)29, TcpDiscoveryNodeAddedMessage::new,
new TcpDiscoveryNodeAddedMessageMarshallableSerializer(marsh,
clsLdr));
factory.register((short)30, FullMessage::new, new
FullMessageSerializer());
+ factory.register((short)31, InitMessage::new, new
InitMessageSerializer());
+ factory.register((short)32, SnapshotStartDiscoveryMessage::new, new
SnapshotStartDiscoveryMessageSerializer());
+ factory.register((short)33, SnapshotCheckProcessRequest::new, new
SnapshotCheckProcessRequestSerializer());
+ factory.register((short)34, SnapshotOperationRequest::new, new
SnapshotOperationRequestSerializer());
+ factory.register((short)35, MasterKeyChangeRequest::new, new
MasterKeyChangeRequestSerializer());
+ factory.register((short)36, SnapshotOperationEndRequest::new, new
SnapshotOperationEndRequestSerializer());
+ factory.register((short)37, SnapshotRestoreStartRequest::new, new
SnapshotRestoreStartRequestSerializer());
+ factory.register((short)38, ChangeCacheEncryptionRequest::new, new
ChangeCacheEncryptionRequestSerializer());
factory.register((short)86, GridCacheVersion::new, new
GridCacheVersionSerializer());
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/ChangeCacheEncryptionRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/ChangeCacheEncryptionRequest.java
index 4498c47d8d9..abd75f6c8c7 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/ChangeCacheEncryptionRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/ChangeCacheEncryptionRequest.java
@@ -17,32 +17,41 @@
package org.apache.ignite.internal.managers.encryption;
-import java.io.Serializable;
import java.util.Objects;
import java.util.UUID;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
/**
* Change cache group encryption key request.
*/
@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
-public class ChangeCacheEncryptionRequest implements Serializable {
- /** Serial version uid. */
- private static final long serialVersionUID = 0L;
-
+public class ChangeCacheEncryptionRequest implements Message {
/** Request ID. */
- private final UUID reqId = UUID.randomUUID();
+ @Order(0)
+ UUID reqId;
/** Cache group IDs. */
- private final int[] grpIds;
+ @Order(1)
+ int[] grpIds;
/** Encryption keys. */
- private final byte[][] keys;
+ @Order(2)
+ byte[][] keys;
/** Key identifiers. */
- private final byte[] keyIds;
+ @Order(3)
+ byte[] keyIds;
/** Master key digest. */
- private final byte[] masterKeyDigest;
+ @Order(4)
+ byte[] masterKeyDigest;
+
+ /** Default constructor for {@link MessageFactory}. */
+ public ChangeCacheEncryptionRequest() {
+ // No-op.
+ }
/**
* @param grpIds Cache group IDs.
@@ -51,6 +60,7 @@ public class ChangeCacheEncryptionRequest implements
Serializable {
* @param masterKeyDigest Master key digest.
*/
public ChangeCacheEncryptionRequest(int[] grpIds, byte[][] keys, byte[]
keyIds, byte[] masterKeyDigest) {
+ this.reqId = UUID.randomUUID();
this.grpIds = grpIds;
this.keys = keys;
this.keyIds = keyIds;
@@ -103,6 +113,11 @@ public class ChangeCacheEncryptionRequest implements
Serializable {
return Objects.equals(reqId, ((ChangeCacheEncryptionRequest)o).reqId);
}
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 38;
+ }
+
/** {@inheritDoc} */
@Override public int hashCode() {
return Objects.hash(reqId);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java
index c5a217d3725..e20b1effd18 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java
@@ -1748,77 +1748,6 @@ public class GridEncryptionManager extends
GridManagerAdapter<EncryptionSpi> imp
});
}
- /** Master key change request. */
- private static class MasterKeyChangeRequest implements Serializable {
- /** Serial version uid. */
- private static final long serialVersionUID = 0L;
-
- /** Request id. */
- private final UUID reqId;
-
- /** Encrypted master key name. */
- private final byte[] encKeyName;
-
- /** Master key digest. */
- private final byte[] digest;
-
- /**
- * @param reqId Request id.
- * @param encKeyName Encrypted master key name.
- * @param digest Master key digest.
- */
- private MasterKeyChangeRequest(UUID reqId, byte[] encKeyName, byte[]
digest) {
- this.reqId = reqId;
- this.encKeyName = encKeyName;
- this.digest = digest;
- }
-
- /** @return Request id. */
- UUID requestId() {
- return reqId;
- }
-
- /** @return Encrypted master key name. */
- byte[] encKeyName() {
- return encKeyName;
- }
-
- /** @return Master key digest. */
- byte[] digest() {
- return digest;
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (this == o)
- return true;
-
- if (!(o instanceof MasterKeyChangeRequest))
- return false;
-
- MasterKeyChangeRequest key = (MasterKeyChangeRequest)o;
-
- return Arrays.equals(encKeyName, key.encKeyName) &&
- Arrays.equals(digest, key.digest) &&
- Objects.equals(reqId, key.reqId);
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- int res = Objects.hash(reqId);
-
- res = 31 * res + Arrays.hashCode(encKeyName);
- res = 31 * res + Arrays.hashCode(digest);
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(MasterKeyChangeRequest.class, this);
- }
- }
-
/** */
protected static class NodeEncryptionKeys implements Serializable {
/** */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/MasterKeyChangeRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/MasterKeyChangeRequest.java
new file mode 100644
index 00000000000..e4f4ce375e4
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/MasterKeyChangeRequest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.encryption;
+
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.UUID;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+
+/** Master key change request. */
+public class MasterKeyChangeRequest implements Message {
+ /** Request id. */
+ @Order(0)
+ UUID reqId;
+
+ /** Encrypted master key name. */
+ @Order(1)
+ byte[] encKeyName;
+
+ /** Master key digest. */
+ @Order(2)
+ byte[] digest;
+
+ /** Default constructor for {@link MessageFactory}. */
+ public MasterKeyChangeRequest() {
+ // No-op.
+ }
+
+ /**
+ * @param reqId Request id.
+ * @param encKeyName Encrypted master key name.
+ * @param digest Master key digest.
+ */
+ public MasterKeyChangeRequest(UUID reqId, byte[] encKeyName, byte[]
digest) {
+ this.reqId = reqId;
+ this.encKeyName = encKeyName;
+ this.digest = digest;
+ }
+
+ /** @return Request id. */
+ UUID requestId() {
+ return reqId;
+ }
+
+ /** @return Encrypted master key name. */
+ byte[] encKeyName() {
+ return encKeyName;
+ }
+
+ /** @return Master key digest. */
+ byte[] digest() {
+ return digest;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (!(o instanceof MasterKeyChangeRequest))
+ return false;
+
+ MasterKeyChangeRequest key = (MasterKeyChangeRequest)o;
+
+ return Arrays.equals(encKeyName, key.encKeyName) &&
+ Arrays.equals(digest, key.digest) &&
+ Objects.equals(reqId, key.reqId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = Objects.hash(reqId);
+
+ res = 31 * res + Arrays.hashCode(encKeyName);
+ res = 31 * res + Arrays.hashCode(digest);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 35;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MasterKeyChangeRequest.class, this);
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotOperationRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotOperationRequest.java
index 3a161664a54..a12210c29cb 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotOperationRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotOperationRequest.java
@@ -17,46 +17,52 @@
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
-import java.io.Serializable;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.util.distributed.DistributedProcess;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.jetbrains.annotations.Nullable;
/**
* Snapshot operation start request for {@link DistributedProcess} initiate
message.
*/
-abstract class AbstractSnapshotOperationRequest implements Serializable {
- /** Serial version uid. */
- private static final long serialVersionUID = 0L;
-
+abstract class AbstractSnapshotOperationRequest implements Message {
/** Request ID. */
- @GridToStringInclude
- private final UUID reqId;
+ @Order(0)
+ UUID reqId;
/** Snapshot name. */
- @GridToStringInclude
- private final String snpName;
+ @Order(1)
+ String snpName;
/** Snapshot directory path. */
- @GridToStringInclude
- private final String snpPath;
+ @Order(2)
+ String snpPath;
/** Collection of cache group names. */
+ @Order(3)
@GridToStringInclude
- private final Collection<String> grps;
+ Collection<String> grps;
/** Start time. */
- @GridToStringInclude
- private final long startTime;
+ @Order(4)
+ long startTime;
/** IDs of the nodes that must be alive to complete the operation. */
@GridToStringInclude
- private final Set<UUID> nodes;
+ @Order(5)
+ Set<UUID> nodes;
+
+ /** Default constructor for {@link MessageFactory}. */
+ public AbstractSnapshotOperationRequest() {
+ // No-op.
+ }
/**
* @param reqId Request ID.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index 7445cb07582..849e87175e3 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -4123,41 +4123,6 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
}
}
- /** Snapshot operation start message. */
- private static class SnapshotStartDiscoveryMessage extends
InitMessage<SnapshotOperationRequest>
- implements SnapshotDiscoveryMessage {
- /** Serial version UID. */
- private static final long serialVersionUID = 0L;
-
- /** */
- private final boolean needExchange;
-
- /**
- * @param procId Unique process id.
- * @param req Snapshot initial request.
- */
- public SnapshotStartDiscoveryMessage(UUID procId,
SnapshotOperationRequest req) {
- super(procId, START_SNAPSHOT, req, req.incremental());
-
- needExchange = !req.incremental();
- }
-
- /** {@inheritDoc} */
- @Override public boolean needExchange() {
- return needExchange;
- }
-
- /** {@inheritDoc} */
- @Override public boolean needAssignPartitions() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(SnapshotStartDiscoveryMessage.class, this,
super.toString());
- }
- }
-
/** */
public static class ClusterSnapshotFuture extends GridFutureAdapter<Void> {
/** Unique snapshot request id. */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcessRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcessRequest.java
index ec646f25420..155c690a03d 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcessRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcessRequest.java
@@ -19,9 +19,11 @@ package
org.apache.ignite.internal.processors.cache.persistence.snapshot;
import java.util.Collection;
import java.util.UUID;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.jetbrains.annotations.Nullable;
/**
@@ -30,23 +32,28 @@ import org.jetbrains.annotations.Nullable;
* @see SnapshotCheckProcess
*/
public class SnapshotCheckProcessRequest extends
AbstractSnapshotOperationRequest {
- /** Serial version uid. */
- private static final long serialVersionUID = 0L;
-
/** If {@code true}, additionally calculates partition hashes. Otherwise,
checks only snapshot integrity and partition counters. */
@GridToStringInclude
- private final boolean fullCheck;
+ @Order(0)
+ boolean fullCheck;
/**
* If {@code true}, all the registered {@link
IgniteSnapshotManager#handlers()} of type {@link SnapshotHandlerType#RESTORE}
* are invoked. Otherwise, only snapshot metadatas and partition hashes
are validated.
*/
@GridToStringInclude
- private final boolean allRestoreHandlers;
+ @Order(1)
+ boolean allRestoreHandlers;
/** Incremental snapshot index. If not positive, snapshot is not
considered as incremental. */
@GridToStringInclude
- private final int incIdx;
+ @Order(2)
+ int incIdx;
+
+ /** Default constructor for {@link MessageFactory}. */
+ public SnapshotCheckProcessRequest() {
+ // No-op.
+ }
/**
* Creates snapshot check process request.
@@ -100,6 +107,11 @@ public class SnapshotCheckProcessRequest extends
AbstractSnapshotOperationReques
return incIdx;
}
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 33;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(SnapshotCheckProcessRequest.class, this,
super.toString());
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationEndRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationEndRequest.java
index ce13f905857..c826b20f5f3 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationEndRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationEndRequest.java
@@ -17,33 +17,41 @@
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
-import java.io.Serializable;
import java.util.List;
import java.util.UUID;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.managers.communication.ErrorMessage;
import
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.jetbrains.annotations.Nullable;
/**
* Snapshot operation end request for {@link
DistributedProcessType#END_SNAPSHOT} initiate message.
*/
-public class SnapshotOperationEndRequest implements Serializable {
- /** Serial version uid. */
- private static final long serialVersionUID = 0L;
-
+public class SnapshotOperationEndRequest implements Message {
/** Request ID. */
@GridToStringInclude
- private final UUID reqId;
+ @Order(0)
+ UUID reqId;
/** Exception occurred during snapshot operation processing. */
- @Nullable private final Throwable err;
+ @Order(1)
+ @Nullable ErrorMessage err;
/**
* Snapshot operation warnings. Warnings do not interrupt snapshot process
but raise exception at the end to make
* the operation status 'not OK' if no other error occurred.
*/
- @Nullable private final List<String> warnings;
+ @Order(2)
+ @Nullable List<String> warnings;
+
+ /** Default constructor for {@link MessageFactory}. */
+ public SnapshotOperationEndRequest() {
+ // No-op.
+ }
/**
* @param id Request ID.
@@ -52,7 +60,7 @@ public class SnapshotOperationEndRequest implements
Serializable {
*/
public SnapshotOperationEndRequest(UUID id, @Nullable Throwable err,
@Nullable List<String> warnings) {
reqId = id;
- this.err = err;
+ this.err = new ErrorMessage(err);
this.warnings = warnings;
}
@@ -63,7 +71,7 @@ public class SnapshotOperationEndRequest implements
Serializable {
/** @return Exception occurred during snapshot operation processing. */
@Nullable public Throwable error() {
- return err;
+ return ErrorMessage.error(err);
}
/** @return Warnings of snapshot operation. */
@@ -71,6 +79,11 @@ public class SnapshotOperationEndRequest implements
Serializable {
return warnings;
}
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 36;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(SnapshotOperationEndRequest.class, this,
super.toString());
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java
index e4a04bec8a5..b6735591320 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java
@@ -20,40 +20,52 @@ package
org.apache.ignite.internal.processors.cache.persistence.snapshot;
import java.util.Collection;
import java.util.Set;
import java.util.UUID;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.util.distributed.DistributedProcess;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.jetbrains.annotations.Nullable;
/**
* Snapshot operation end request for {@link
DistributedProcess.DistributedProcessType#START_SNAPSHOT} initiate message.
*/
public class SnapshotOperationRequest extends AbstractSnapshotOperationRequest
{
- /** Serial version uid. */
- private static final long serialVersionUID = 0L;
-
/** Operational node ID. */
- private final UUID opNodeId;
+ @Order(0)
+ UUID opNodeId;
/** If {@code true} then incremental snapshot requested. */
- private final boolean incremental;
+ @Order(1)
+ boolean incremental;
/** Index of incremental snapshot. */
- private final int incIdx;
+ @Order(2)
+ int incIdx;
/** If {@code true} snapshot only primary copies of partitions. */
- private final boolean onlyPrimary;
+ @Order(3)
+ boolean onlyPrimary;
/** If {@code true} then create dump. */
- private final boolean dump;
+ @Order(4)
+ boolean dump;
/** If {@code true} then compress partition files. */
- private final boolean compress;
+ @Order(5)
+ boolean compress;
/** If {@code true} then content of dump encrypted. */
- private final boolean encrypt;
+ @Order(6)
+ boolean encrypt;
/** If {@code true} then only cache config and metadata included in
snapshot. */
- private final boolean configOnly;
+ @Order(7)
+ boolean configOnly;
+
+ /** Default constructor for {@link MessageFactory}. */
+ public SnapshotOperationRequest() {
+ // No-op.
+ }
/**
* @param reqId Request ID.
@@ -139,8 +151,13 @@ public class SnapshotOperationRequest extends
AbstractSnapshotOperationRequest {
return configOnly;
}
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 34;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(SnapshotOperationRequest.class, this);
+ return S.toString(SnapshotOperationRequest.class, this,
super.toString());
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
index 84d12d3b24c..036dcdc1312 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
@@ -137,19 +137,19 @@ public class SnapshotRestoreProcess {
private final DistributedProcess<SnapshotOperationRequest,
SnapshotRestoreOperationResponse> prepareRestoreProc;
/** Cache group restore preload partitions phase. */
- private final DistributedProcess<UUID, Message> preloadProc;
+ private final DistributedProcess<SnapshotRestoreStartRequest, Message>
preloadProc;
/** Cache group restore cache start phase. */
- private final DistributedProcess<UUID, Message> cacheStartProc;
+ private final DistributedProcess<SnapshotRestoreStartRequest, Message>
cacheStartProc;
/** Cache group restore cache stop phase. */
- private final DistributedProcess<UUID, Message> cacheStopProc;
+ private final DistributedProcess<SnapshotRestoreStartRequest, Message>
cacheStopProc;
/** Incremental snapshot restore phase. */
- private final DistributedProcess<UUID, Message> incSnpRestoreProc;
+ private final DistributedProcess<SnapshotRestoreStartRequest, Message>
incSnpRestoreProc;
/** Cache group restore rollback phase. */
- private final DistributedProcess<UUID, Message> rollbackRestoreProc;
+ private final DistributedProcess<SnapshotRestoreStartRequest, Message>
rollbackRestoreProc;
/** Logger. */
private final IgniteLogger log;
@@ -861,7 +861,7 @@ public class SnapshotRestoreProcess {
opCtx0.cfgs = globalCfgs;
if (U.isLocalNodeCoordinator(ctx.discovery()))
- preloadProc.start(reqId, reqId);
+ preloadProc.start(reqId, new SnapshotRestoreStartRequest(reqId));
}
/**
@@ -909,18 +909,20 @@ public class SnapshotRestoreProcess {
}
/**
- * @param reqId Request id.
+ * @param req Request.
* @return Future which will be completed when the preload ends.
*/
- private IgniteInternalFuture<Message> preload(UUID reqId) {
+ private IgniteInternalFuture<Message> preload(SnapshotRestoreStartRequest
req) {
if (ctx.clientNode())
return new GridFinishedFuture<>();
SnapshotRestoreContext opCtx0 = opCtx;
GridFutureAdapter<Message> retFut = new GridFutureAdapter<>();
- if (opCtx0 == null)
- return new GridFinishedFuture<>(new
IgniteCheckedException("Snapshot restore process has incorrect restore state: "
+ reqId));
+ if (opCtx0 == null) {
+ return new GridFinishedFuture<>(new IgniteCheckedException(
+ "Snapshot restore process has incorrect restore state: " +
req.requestId()));
+ }
if (opCtx0.dirs.isEmpty())
return new GridFinishedFuture<>();
@@ -940,7 +942,7 @@ public class SnapshotRestoreProcess {
if (log.isInfoEnabled()) {
log.info("Starting snapshot preload operation to restore cache
groups " +
- "[reqId=" + reqId +
+ "[reqId=" + req.requestId() +
", snapshot=" + opCtx0.snpName +
", caches=" + F.transform(opCtx0.dirs.values(), s ->
NodeFileTree.cacheName(s.get(0))) + ']');
}
@@ -1057,7 +1059,7 @@ public class SnapshotRestoreProcess {
if (log.isInfoEnabled()) {
log.info("The snapshot was taken on the same
cluster topology. The index will be copied to " +
- "restoring cache group if necessary [reqId=" +
reqId + ", snapshot=" + opCtx0.snpName +
+ "restoring cache group if necessary [reqId=" +
req.requestId() + ", snapshot=" + opCtx0.snpName +
", dir=" + e.getValue().get(0).getName() +
']');
}
@@ -1093,7 +1095,7 @@ public class SnapshotRestoreProcess {
for (Map.Entry<UUID, Map<Integer, Set<Integer>>> m :
snpAff.entrySet()) {
if (log.isInfoEnabled()) {
log.info("Trying to request partitions from remote
node " +
- "[reqId=" + reqId +
+ "[reqId=" + req.requestId() +
", snapshot=" + opCtx0.snpName +
", nodeId=" + m.getKey() +
", grpParts=" +
partitionsMapToString(m.getValue(), cacheGrpNames) + "]");
@@ -1248,20 +1250,20 @@ public class SnapshotRestoreProcess {
if (failure != null) {
if (U.isLocalNodeCoordinator(ctx.discovery()))
- rollbackRestoreProc.start(reqId, reqId);
+ rollbackRestoreProc.start(reqId, new
SnapshotRestoreStartRequest(reqId));
return;
}
if (U.isLocalNodeCoordinator(ctx.discovery()))
- cacheStartProc.start(reqId, reqId);
+ cacheStartProc.start(reqId, new
SnapshotRestoreStartRequest(reqId));
}
/**
- * @param reqId Request ID.
+ * @param req Request.
* @return Result future.
*/
- private IgniteInternalFuture<Message> cacheStart(UUID reqId) {
+ private IgniteInternalFuture<Message>
cacheStart(SnapshotRestoreStartRequest req) {
if (ctx.clientNode())
return new GridFinishedFuture<>();
@@ -1285,7 +1287,7 @@ public class SnapshotRestoreProcess {
// We set the topology node IDs required to successfully start the
cache, if any of the required nodes leave
// the cluster during the cache startup, the whole procedure will be
rolled back.
- return ctx.cache().dynamicStartCachesByStoredConf(ccfgs, true, true,
false, IgniteUuid.fromUuid(reqId))
+ return ctx.cache().dynamicStartCachesByStoredConf(ccfgs, true, true,
false, IgniteUuid.fromUuid(req.requestId()))
.chain(fut -> {
if (fut.error() != null)
throw F.wrap(fut.error());
@@ -1311,7 +1313,7 @@ public class SnapshotRestoreProcess {
if (failure == null) {
if (opCtx0.incIdx > 0) {
if (U.isLocalNodeCoordinator(ctx.discovery()))
- incSnpRestoreProc.start(reqId, reqId);
+ incSnpRestoreProc.start(reqId, new
SnapshotRestoreStartRequest(reqId));
return;
}
@@ -1324,18 +1326,18 @@ public class SnapshotRestoreProcess {
opCtx0.err.compareAndSet(null, failure);
if (U.isLocalNodeCoordinator(ctx.discovery()))
- cacheStopProc.start(reqId, reqId);
+ cacheStopProc.start(reqId, new SnapshotRestoreStartRequest(reqId));
}
/**
- * @param reqId Request ID.
+ * @param req Request.
* @return Result future.
*/
- private IgniteInternalFuture<Message> cacheStop(UUID reqId) {
+ private IgniteInternalFuture<Message>
cacheStop(SnapshotRestoreStartRequest req) {
if (!U.isLocalNodeCoordinator(ctx.discovery()))
return new GridFinishedFuture<>();
- assert opCtx.reqId == reqId;
+ assert opCtx.reqId == req.requestId();
SnapshotRestoreContext opCtx0 = opCtx;
@@ -1345,7 +1347,7 @@ public class SnapshotRestoreProcess {
.collect(Collectors.toSet());
if (log.isInfoEnabled())
- log.info("Stopping caches [reqId=" + reqId + ", caches=" +
stopCaches + ']');
+ log.info("Stopping caches [reqId=" + req.requestId() + ", caches="
+ stopCaches + ']');
// Skip deleting cache files as they will be removed during rollback.
return ctx.cache().dynamicDestroyCaches(stopCaches, false, false)
@@ -1373,16 +1375,16 @@ public class SnapshotRestoreProcess {
}
if (U.isLocalNodeCoordinator(ctx.discovery()))
- rollbackRestoreProc.start(reqId, reqId);
+ rollbackRestoreProc.start(reqId, new
SnapshotRestoreStartRequest(reqId));
}
/**
* Inits restoring incremental snapshot.
*
- * @param reqId Request ID.
+ * @param req Request ID.
* @return Result future.
*/
- private IgniteInternalFuture<Message> incrementalSnapshotRestore(UUID
reqId) {
+ private IgniteInternalFuture<Message> incrementalSnapshotRestore(Message
req) {
SnapshotRestoreContext opCtx0 = opCtx;
if (ctx.clientNode() || opCtx0 == null ||
!opCtx0.nodes().contains(ctx.localNodeId()))
@@ -1575,7 +1577,7 @@ public class SnapshotRestoreProcess {
opCtx0.err.compareAndSet(null, failure);
if (U.isLocalNodeCoordinator(ctx.discovery()))
- cacheStopProc.start(reqId, reqId);
+ cacheStopProc.start(reqId, new SnapshotRestoreStartRequest(reqId));
}
/**
@@ -1647,10 +1649,10 @@ public class SnapshotRestoreProcess {
}
/**
- * @param reqId Request ID.
+ * @param req Request.
* @return Result future.
*/
- private IgniteInternalFuture<Message> rollback(UUID reqId) {
+ private IgniteInternalFuture<Message> rollback(SnapshotRestoreStartRequest
req) {
if (ctx.clientNode())
return new GridFinishedFuture<>();
@@ -1668,7 +1670,7 @@ public class SnapshotRestoreProcess {
try {
ctx.cache().context().snapshotMgr().snapshotExecutorService().execute(() -> {
if (log.isInfoEnabled()) {
- log.info("Removing restored cache directories [reqId=" +
reqId +
+ log.info("Removing restored cache directories [reqId=" +
req.requestId() +
", snapshot=" + opCtx0.snpName + ", dirs=" +
opCtx0.dirs.values() + ']');
}
@@ -1680,14 +1682,14 @@ public class SnapshotRestoreProcess {
if (tmpCacheDir.exists() && !U.delete(tmpCacheDir)) {
log.error("Unable to perform rollback routine
completely, cannot remove temp directory " +
- "[reqId=" + reqId + ", snapshot=" +
opCtx0.snpName + ", dir=" + tmpCacheDir + ']');
+ "[reqId=" + req.requestId() + ", snapshot=" +
opCtx0.snpName + ", dir=" + tmpCacheDir + ']');
ex = new IgniteCheckedException("Unable to remove
temporary cache directory " + cacheDir);
}
if (cacheDir.exists() && !U.delete(cacheDir)) {
log.error("Unable to perform rollback routine
completely, cannot remove cache directory " +
- "[reqId=" + reqId + ", snapshot=" +
opCtx0.snpName + ", dir=" + cacheDir + ']');
+ "[reqId=" + req.requestId() + ", snapshot=" +
opCtx0.snpName + ", dir=" + cacheDir + ']');
ex = new IgniteCheckedException("Unable to remove
cache directory " + cacheDir);
}
@@ -1702,7 +1704,7 @@ public class SnapshotRestoreProcess {
}
catch (RejectedExecutionException e) {
log.error("Unable to perform rollback routine, task has been
rejected " +
- "[reqId=" + reqId + ", snapshot=" + opCtx0.snpName + ']');
+ "[reqId=" + req.requestId() + ", snapshot=" + opCtx0.snpName +
']');
retFut.onDone(e);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreStartRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreStartRequest.java
new file mode 100644
index 00000000000..b2e10f0ac7b
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreStartRequest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.util.UUID;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+
+/** */
+public class SnapshotRestoreStartRequest implements Message {
+ /** Request id. */
+ @Order(0)
+ UUID reqId;
+
+ /** Default constructor for {@link MessageFactory}. */
+ public SnapshotRestoreStartRequest() {
+ // No-op.
+ }
+
+ /** */
+ public SnapshotRestoreStartRequest(UUID reqId) {
+ this.reqId = reqId;
+ }
+
+ /** @return Request ID. */
+ public UUID requestId() {
+ return reqId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 37;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(SnapshotRestoreStartRequest.class, this);
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotStartDiscoveryMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotStartDiscoveryMessage.java
new file mode 100644
index 00000000000..9c3ca28d73e
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotStartDiscoveryMessage.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.util.UUID;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.util.distributed.InitMessage;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+
+import static
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.START_SNAPSHOT;
+
+/** Snapshot operation start message. */
+public class SnapshotStartDiscoveryMessage extends
InitMessage<SnapshotOperationRequest> implements SnapshotDiscoveryMessage {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ @Order(0)
+ boolean needExchange;
+
+ /** Default constructor for {@link MessageFactory}. */
+ public SnapshotStartDiscoveryMessage() {
+ // No-op.
+ }
+
+ /**
+ * @param procId Unique process id.
+ * @param req Snapshot initial request.
+ */
+ public SnapshotStartDiscoveryMessage(UUID procId, SnapshotOperationRequest
req) {
+ super(procId, START_SNAPSHOT, req, req.incremental());
+
+ needExchange = !req.incremental();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean needExchange() {
+ return needExchange;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean needAssignPartitions() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 32;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(SnapshotStartDiscoveryMessage.class, this,
super.toString());
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsProcessor.java
index 593969d74e5..99467083712 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsProcessor.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.processors.performancestatistics;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EventListener;
@@ -76,7 +75,7 @@ public class PerformanceStatisticsProcessor extends
GridProcessorAdapter {
private final ArrayList<PerformanceStatisticsStateListener> lsnrs = new
ArrayList<>();
/** Rotate performance statistics process. */
- private DistributedProcess<Serializable, Message> rotateProc;
+ private DistributedProcess<Message, Message> rotateProc;
/** Whether performance statistics collection is running. */
private volatile boolean isRunning;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java
index ee0a93945a5..73298b6696e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.util.distributed;
-import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
@@ -72,7 +71,7 @@ import static
org.apache.ignite.internal.util.lang.ClusterNodeFunc.node2id;
* @see InitMessage
* @see FullMessage
*/
-public class DistributedProcess<I extends Serializable, R extends Message> {
+public class DistributedProcess<I extends Message, R extends Message> {
/** Process type. */
private final DistributedProcessType type;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/InitMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/InitMessage.java
index f98e2715aae..8eec7bf33b8 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/InitMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/InitMessage.java
@@ -17,8 +17,8 @@
package org.apache.ignite.internal.util.distributed;
-import java.io.Serializable;
import java.util.UUID;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
@@ -26,6 +26,8 @@ import
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.jetbrains.annotations.Nullable;
/**
@@ -36,24 +38,34 @@ import org.jetbrains.annotations.Nullable;
* @see FullMessage
* @see SingleNodeMessage
*/
-public class InitMessage<I extends Serializable> implements
DiscoveryCustomMessage {
+public class InitMessage<I extends Message> implements Message,
DiscoveryCustomMessage {
/** Serial version uid. */
private static final long serialVersionUID = 0L;
/** Custom message ID. */
- private final IgniteUuid id = IgniteUuid.randomUuid();
+ @Order(0)
+ public IgniteUuid id;
/** Process id. */
- private final UUID procId;
+ @Order(1)
+ public UUID procId;
/** Process type. */
- private final int type;
+ @Order(2)
+ public int type;
/** Request. */
- private final I req;
+ @Order(3)
+ public Message req;
/** Whether coordinator waits client nodes results. */
- private final boolean waitClnRes;
+ @Order(4)
+ public boolean waitClnRes;
+
+ /** Default constructor for {@link MessageFactory}. */
+ public InitMessage() {
+ // No-op.
+ }
/**
* @param procId Process id.
@@ -61,6 +73,7 @@ public class InitMessage<I extends Serializable> implements
DiscoveryCustomMessa
* @param req Request.
*/
public InitMessage(UUID procId, DistributedProcessType type, I req,
boolean waitClnRes) {
+ this.id = IgniteUuid.randomUuid();
this.procId = procId;
this.type = type.ordinal();
this.req = req;
@@ -95,7 +108,7 @@ public class InitMessage<I extends Serializable> implements
DiscoveryCustomMessa
/** @return Request. */
public I request() {
- return req;
+ return (I)req;
}
/** @return Whether coordinator waits client nodes results. */
@@ -107,4 +120,9 @@ public class InitMessage<I extends Serializable> implements
DiscoveryCustomMessa
@Override public String toString() {
return S.toString(InitMessage.class, this);
}
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 31;
+ }
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/encryption/CacheGroupKeyChangeTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/encryption/CacheGroupKeyChangeTest.java
index 0e08ddd281f..f8d336b395c 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/encryption/CacheGroupKeyChangeTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/encryption/CacheGroupKeyChangeTest.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.encryption;
-import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -55,6 +54,7 @@ import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils.DiscoveryHook;
@@ -1029,7 +1029,7 @@ public class CacheGroupKeyChangeTest extends
AbstractEncryptionTest {
if (!(customMsg instanceof InitMessage))
return;
- InitMessage<Serializable> msg =
(InitMessage<Serializable>)customMsg;
+ InitMessage<Message> msg = (InitMessage<Message>)customMsg;
if (msg.type() != type.ordinal())
return;
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java
index d002ea6466a..f3edd6df687 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java
@@ -617,7 +617,7 @@ public class IgniteSnapshotManagerSelfTest extends
AbstractSnapshotSelfTest {
LogListener matchFinish = LogListener.matches("Cluster-wide snapshot
operation finished successfully: ").times(entriesCnt).build();
listenLog.registerListener(matchFinish);
- LogListener matchFullParams = LogListener.matches("incremental=false,
incIdx=-1").times(4).build();
+ LogListener matchFullParams = LogListener.matches("incremental=false,
incIdx=-1").times(5).build();
listenLog.registerListener(matchFullParams);
LogListener matchIncParams =
LogListener.matches("incremental=true").times(4 * (entriesCnt - 1)).build();
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/DistributedProcessClientAwaitTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/DistributedProcessClientAwaitTest.java
index 18bbf5a6d2f..5fc16e6534b 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/DistributedProcessClientAwaitTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/DistributedProcessClientAwaitTest.java
@@ -104,7 +104,7 @@ public class DistributedProcessClientAwaitTest extends
GridCommonAbstractTest {
public void testSkipWaitingFailedClient() throws Exception {
finishLatchRef.set(new CountDownLatch(NODES_CNT));
- List<DistributedProcess<Integer, Message>> processes = new
ArrayList<>(NODES_CNT + 1);
+ List<DistributedProcess<Message, Message>> processes = new
ArrayList<>(NODES_CNT + 1);
TestRecordingCommunicationSpi clnCommSpi =
TestRecordingCommunicationSpi.spi(grid(NODES_CNT));
clnCommSpi.blockMessages((node, msg) -> msg instanceof
SingleNodeMessage);
@@ -115,13 +115,13 @@ public class DistributedProcessClientAwaitTest extends
GridCommonAbstractTest {
nodeIdsRes.add(grid(i).localNode().id());
for (int n = 0; n < NODES_CNT + 1; n++) {
- DistributedProcess<Integer, Message> dp = new
TestDistributedProcess(
+ DistributedProcess<Message, Message> dp = new
TestDistributedProcess(
nodeIdsRes, grid(n).context(), (id, req) -> new
InitMessage<>(id, TEST_PROCESS, req, true));
processes.add(dp);
}
- processes.get(0).start(UUID.randomUUID(), 0);
+ processes.get(0).start(UUID.randomUUID(), null);
clnCommSpi.waitForBlocked();
@@ -137,7 +137,7 @@ public class DistributedProcessClientAwaitTest extends
GridCommonAbstractTest {
public void testChangedCoordinatorAwaitsClientResult() throws Exception {
finishLatchRef.set(new CountDownLatch(NODES_CNT));
- List<DistributedProcess<Integer, Message>> processes = new
ArrayList<>(NODES_CNT + 1);
+ List<DistributedProcess<Message, Message>> processes = new
ArrayList<>(NODES_CNT + 1);
TestRecordingCommunicationSpi clnCommSpi =
TestRecordingCommunicationSpi.spi(grid(NODES_CNT));
clnCommSpi.blockMessages((node, msg) -> msg instanceof
SingleNodeMessage);
@@ -148,13 +148,13 @@ public class DistributedProcessClientAwaitTest extends
GridCommonAbstractTest {
nodeIdsRes.add(grid(i).localNode().id());
for (int n = 0; n < NODES_CNT + 1; n++) {
- DistributedProcess<Integer, Message> dp = new
TestDistributedProcess(
+ DistributedProcess<Message, Message> dp = new
TestDistributedProcess(
nodeIdsRes, grid(n).context(), (id, req) -> new
InitMessage<>(id, TEST_PROCESS, req, true));
processes.add(dp);
}
- processes.get(0).start(UUID.randomUUID(), 0);
+ processes.get(0).start(UUID.randomUUID(), null);
clnCommSpi.waitForBlocked();
@@ -172,12 +172,12 @@ public class DistributedProcessClientAwaitTest extends
GridCommonAbstractTest {
/** */
private void checkExpectedResults(
Set<UUID> expNodeIdRes,
- BiFunction<UUID, Integer, ? extends InitMessage<Integer>>
initMsgFactory
+ BiFunction<UUID, Message, ? extends InitMessage<Message>>
initMsgFactory
) throws Exception {
- List<DistributedProcess<Integer, Message>> processes = new
ArrayList<>(NODES_CNT + 1);
+ List<DistributedProcess<Message, Message>> processes = new
ArrayList<>(NODES_CNT + 1);
for (int n = 0; n < NODES_CNT + 1; n++) {
- DistributedProcess<Integer, Message> dp = new
TestDistributedProcess(
+ DistributedProcess<Message, Message> dp = new
TestDistributedProcess(
expNodeIdRes, grid(n).context(), initMsgFactory);
processes.add(dp);
@@ -187,7 +187,7 @@ public class DistributedProcessClientAwaitTest extends
GridCommonAbstractTest {
failRef.set(null);
finishLatchRef.set(new CountDownLatch(NODES_CNT + 1));
- processes.get(n).start(UUID.randomUUID(), 0);
+ processes.get(n).start(UUID.randomUUID(), null);
finishLatchRef.get().await(getTestTimeout(), MILLISECONDS);
@@ -196,12 +196,12 @@ public class DistributedProcessClientAwaitTest extends
GridCommonAbstractTest {
}
/** */
- private static class TestDistributedProcess extends
DistributedProcess<Integer, Message> {
+ private static class TestDistributedProcess extends
DistributedProcess<Message, Message> {
/** */
public TestDistributedProcess(
Set<UUID> expNodeIdsRes,
GridKernalContext ctx,
- BiFunction<UUID, Integer, ? extends InitMessage<Integer>>
initMsgFactory
+ BiFunction<UUID, Message, ? extends InitMessage<Message>>
initMsgFactory
) {
super(
ctx,
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/DistributedProcessCoordinatorLeftTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/DistributedProcessCoordinatorLeftTest.java
index 13203ae8bee..e907d130bc0 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/DistributedProcessCoordinatorLeftTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/DistributedProcessCoordinatorLeftTest.java
@@ -112,12 +112,12 @@ public class DistributedProcessCoordinatorLeftTest
extends GridCommonAbstractTes
CountDownLatch startLatch = new CountDownLatch(NODES_CNT);
CountDownLatch finishLatch = new CountDownLatch(NODES_CNT - 1);
- HashMap<String, DistributedProcess<Integer, TestIntegerMessage>>
processes = new HashMap<>();
+ HashMap<String, DistributedProcess<TestIntegerMessage,
TestIntegerMessage>> processes = new HashMap<>();
int procRes = 1;
for (Ignite grid : G.allGrids()) {
- DistributedProcess<Integer, TestIntegerMessage> dp = new
DistributedProcess<>(((IgniteEx)grid).context(),
+ DistributedProcess<TestIntegerMessage, TestIntegerMessage> dp =
new DistributedProcess<>(((IgniteEx)grid).context(),
TEST_PROCESS,
req -> {
IgniteInternalFuture<TestIntegerMessage> fut = runAsync(()
-> {
@@ -128,7 +128,7 @@ public class DistributedProcessCoordinatorLeftTest extends
GridCommonAbstractTes
fail("Unexpected interrupt.");
}
- return new TestIntegerMessage(req);
+ return new TestIntegerMessage(req.value());
});
// A single message will be sent before this latch
released.
@@ -151,7 +151,7 @@ public class DistributedProcessCoordinatorLeftTest extends
GridCommonAbstractTes
processes.put(grid.name(), dp);
}
- processes.get(grid(STOP_NODE_IDX).name()).start(UUID.randomUUID(),
procRes);
+ processes.get(grid(STOP_NODE_IDX).name()).start(UUID.randomUUID(), new
TestIntegerMessage(procRes));
assertTrue(startLatch.await(TIMEOUT, MILLISECONDS));
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/DistributedProcessErrorHandlingTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/DistributedProcessErrorHandlingTest.java
index 7254c47cdb4..6ffb7cc7988 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/DistributedProcessErrorHandlingTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/DistributedProcessErrorHandlingTest.java
@@ -137,16 +137,16 @@ public class DistributedProcessErrorHandlingTest extends
GridCommonAbstractTest
/** */
private void checkDistributedProcess(
- BiFunction<IgniteEx, CountDownLatch, DistributedProcess<Integer,
Message>> processFactory
+ BiFunction<IgniteEx, CountDownLatch, DistributedProcess<Message,
Message>> processFactory
) throws Exception {
- DistributedProcess<Integer, Message> proc = null;
+ DistributedProcess<Message, Message> proc = null;
CountDownLatch latch = new CountDownLatch(SRV_NODES + 1);
for (Ignite g: G.allGrids())
proc = processFactory.apply((IgniteEx)g, latch);
- proc.start(UUID.randomUUID(), 0);
+ proc.start(UUID.randomUUID(), null);
assertTrue(latch.await(5, TimeUnit.SECONDS));