This is an automated email from the ASF dual-hosted git repository.
shishkovilja pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 18c177b264b IGNITE-28607 Use Message DTO for
DiscoveryDataBag#GridDiscoveryData (#13157)
18c177b264b is described below
commit 18c177b264b972b64d6a34ffe685e7981d534180
Author: Ilya Shishkov <[email protected]>
AuthorDate: Wed Jun 24 12:26:32 2026 +0300
IGNITE-28607 Use Message DTO for DiscoveryDataBag#GridDiscoveryData (#13157)
---
.../ignite/internal/CoreMessagesProvider.java | 30 +-
.../ignite/internal/GridPluginComponent.java | 10 +-
.../encryption/EncryptionDataBagItem.java} | 16 +-
.../managers/encryption/GridEncryptionManager.java | 15 +-
.../eventstorage/EventsDataBagItem.java} | 17 +-
.../eventstorage/GridEventStorageManager.java | 10 +-
.../AuthentificationDataBagItem.java | 55 ++++
.../IgniteAuthenticationProcessor.java | 37 +--
.../authentication/UserManagementOperation.java | 6 +-
.../processors/cache/ClusterCachesInfo.java | 5 +-
...rsionsData.java => CacheBinaryDataBagItem.java} | 12 +-
.../binary/CacheObjectBinaryProcessorImpl.java | 23 +-
.../processors/cluster/ClusterIdAndTag.java | 13 +-
.../processors/cluster/ClusterProcessor.java | 60 ++--
.../ClusterUpdateNotifierDataBagItem.java} | 19 +-
.../cluster/GridClusterStateProcessor.java | 8 +-
.../continuous/GridContinuousProcessor.java | 9 +-
.../marshaller/GridMarshallerMappingProcessor.java | 20 +-
...appingsData.java => MarshallerDataBagItem.java} | 15 +-
.../persistence/DistributedMetaStorageImpl.java | 2 +-
.../processors/plugin/IgnitePluginProcessor.java | 34 +--
.../processors/plugin/PluginsDataBagItem.java} | 53 ++--
.../processors/query/GridQueryProcessor.java | 65 ++---
.../message/QueryInlineSizesDataBagItem.java} | 17 +-
.../message/QueryProposalsDataBagItem.java} | 30 +-
.../message/SchemaAbstractDiscoveryMessage.java | 10 +-
.../message/SchemaFinishDiscoveryMessage.java | 3 -
.../message/SchemaProposeDiscoveryMessage.java | 3 -
.../processors/service/IgniteServiceProcessor.java | 2 +-
.../ignite/spi/discovery/DiscoveryDataBag.java | 69 +++--
...ta.java => SerializableDataBagItemWrapper.java} | 55 +++-
.../ignite/spi/discovery/tcp/ServerImpl.java | 42 +--
.../ignite/spi/discovery/tcp/TcpDiscoverySpi.java | 34 +--
.../tcp/internal/DiscoveryDataPacket.java | 316 ++++++---------------
.../discovery/DiscoverySpiDataExchangeTest.java | 6 +
.../spi/discovery/tcp/TcpDiscoverySelfTest.java | 12 +-
.../discovery/zk/internal/ZkBulkJoinContext.java | 8 +-
.../zk/internal/ZkDiscoDataBagWrapper.java | 77 +++++
.../discovery/zk/internal/ZkMessageFactory.java | 1 +
.../zk/internal/ZookeeperDiscoveryImpl.java | 16 +-
40 files changed, 572 insertions(+), 663 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
index cc0ab3e112b..c0191788706 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
@@ -37,13 +37,16 @@ import
org.apache.ignite.internal.managers.deployment.GridDeploymentRequest;
import org.apache.ignite.internal.managers.deployment.GridDeploymentResponse;
import
org.apache.ignite.internal.managers.discovery.SecurityAwareCustomMessageWrapper;
import
org.apache.ignite.internal.managers.encryption.ChangeCacheEncryptionRequest;
+import org.apache.ignite.internal.managers.encryption.EncryptionDataBagItem;
import
org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyRequest;
import
org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyResponse;
import org.apache.ignite.internal.managers.encryption.GroupKeyEncrypted;
import org.apache.ignite.internal.managers.encryption.MasterKeyChangeRequest;
import org.apache.ignite.internal.managers.encryption.NodeEncryptionKeys;
+import org.apache.ignite.internal.managers.eventstorage.EventsDataBagItem;
import
org.apache.ignite.internal.managers.eventstorage.GridEventStorageMessage;
import
org.apache.ignite.internal.plugin.AbstractMarshallableMessageFactoryProvider;
+import
org.apache.ignite.internal.processors.authentication.AuthentificationDataBagItem;
import org.apache.ignite.internal.processors.authentication.User;
import
org.apache.ignite.internal.processors.authentication.UserAcceptedMessage;
import
org.apache.ignite.internal.processors.authentication.UserAuthenticateRequestMessage;
@@ -72,7 +75,7 @@ import
org.apache.ignite.internal.processors.cache.WalStateAckMessage;
import org.apache.ignite.internal.processors.cache.WalStateFinishMessage;
import org.apache.ignite.internal.processors.cache.WalStateProposeMessage;
import
org.apache.ignite.internal.processors.cache.binary.BinaryMetadataVersionInfo;
-import
org.apache.ignite.internal.processors.cache.binary.BinaryMetadataVersionsData;
+import
org.apache.ignite.internal.processors.cache.binary.CacheBinaryDataBagItem;
import
org.apache.ignite.internal.processors.cache.binary.MetadataRemoveAcceptedMessage;
import
org.apache.ignite.internal.processors.cache.binary.MetadataRemoveProposedMessage;
import
org.apache.ignite.internal.processors.cache.binary.MetadataRequestMessage;
@@ -181,7 +184,9 @@ import
org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
import org.apache.ignite.internal.processors.cluster.CacheMetricsMessage;
import
org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
+import org.apache.ignite.internal.processors.cluster.ClusterIdAndTag;
import
org.apache.ignite.internal.processors.cluster.ClusterMetricsUpdateMessage;
+import
org.apache.ignite.internal.processors.cluster.ClusterUpdateNotifierDataBagItem;
import org.apache.ignite.internal.processors.cluster.NodeFullMetricsMessage;
import org.apache.ignite.internal.processors.cluster.NodeMetricsMessage;
import
org.apache.ignite.internal.processors.continuous.ContinuousRoutineStartResultMessage;
@@ -197,15 +202,15 @@ import
org.apache.ignite.internal.processors.datastreamer.DataStreamerResponse;
import org.apache.ignite.internal.processors.marshaller.MappedName;
import org.apache.ignite.internal.processors.marshaller.MappingAcceptedMessage;
import org.apache.ignite.internal.processors.marshaller.MappingProposedMessage;
+import org.apache.ignite.internal.processors.marshaller.MarshallerDataBagItem;
import org.apache.ignite.internal.processors.marshaller.MarshallerMappingItem;
-import org.apache.ignite.internal.processors.marshaller.MarshallerMappingsData;
import
org.apache.ignite.internal.processors.marshaller.MissingMappingRequestMessage;
import
org.apache.ignite.internal.processors.marshaller.MissingMappingResponseMessage;
import
org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageCasAckMessage;
import
org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageCasMessage;
import
org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUpdateAckMessage;
import
org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUpdateMessage;
-import org.apache.ignite.internal.processors.query.InlineSizesData;
+import org.apache.ignite.internal.processors.plugin.PluginsDataBagItem;
import org.apache.ignite.internal.processors.query.QueryField;
import
org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest;
import
org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse;
@@ -213,6 +218,8 @@ import
org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQuery
import
org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse;
import
org.apache.ignite.internal.processors.query.messages.GridQueryKillRequest;
import
org.apache.ignite.internal.processors.query.messages.GridQueryKillResponse;
+import
org.apache.ignite.internal.processors.query.schema.message.QueryInlineSizesDataBagItem;
+import
org.apache.ignite.internal.processors.query.schema.message.QueryProposalsDataBagItem;
import
org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDiscoveryMessage;
import
org.apache.ignite.internal.processors.query.schema.message.SchemaOperationStatusMessage;
import
org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessage;
@@ -253,7 +260,7 @@ import
org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage;
import org.apache.ignite.spi.communication.tcp.messages.HandshakeWaitMessage;
import org.apache.ignite.spi.communication.tcp.messages.NodeIdMessage;
import
org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage;
-import org.apache.ignite.spi.discovery.ObjectData;
+import org.apache.ignite.spi.discovery.SerializableDataBagItemWrapper;
import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import org.apache.ignite.spi.discovery.tcp.messages.InetAddressMessage;
@@ -353,7 +360,7 @@ public class CoreMessagesProvider extends
AbstractMarshallableMessageFactoryProv
withNoSchema(GridCacheVersion.class);
withNoSchema(GridCacheVersionEx.class);
withNoSchema(WALPointer.class);
- withNoSchemaResolvedClassLoader(ObjectData.class);
+ withNoSchemaResolvedClassLoader(SerializableDataBagItemWrapper.class);
withSchemaResolvedClassLoader(GridTopicMessage.class);
// [5700 - 5900]: Discovery originated messages.
@@ -574,7 +581,8 @@ public class CoreMessagesProvider extends
AbstractMarshallableMessageFactoryProv
withNoSchema(StatisticsResponse.class);
withNoSchema(CacheContinuousQueryBatchAck.class);
withSchema(CacheContinuousQueryEntry.class);
- withNoSchema(InlineSizesData.class);
+ withNoSchema(QueryInlineSizesDataBagItem.class);
+ withSchema(QueryProposalsDataBagItem.class);
// [11200 - 11300]: Compute, distributed process messages.
msgIdx = 11200;
@@ -628,6 +636,7 @@ public class CoreMessagesProvider extends
AbstractMarshallableMessageFactoryProv
withNoSchema(UserAuthenticateRequestMessage.class);
withNoSchema(UserAuthenticateResponseMessage.class);
withNoSchema(TcpDiscoveryAuthFailedMessage.class);
+ withSchema(AuthentificationDataBagItem.class);
// [12200 - 12300]: Binary, classloading and marshalling messages.
msgIdx = 12200;
@@ -640,9 +649,9 @@ public class CoreMessagesProvider extends
AbstractMarshallableMessageFactoryProv
withNoSchema(MetadataResponseMessage.class);
withNoSchema(MarshallerMappingItem.class);
withSchemaResolvedClassLoader(BinaryMetadataVersionInfo.class);
- withNoSchema(BinaryMetadataVersionsData.class);
+ withNoSchema(CacheBinaryDataBagItem.class);
withNoSchema(MappedName.class);
- withNoSchema(MarshallerMappingsData.class);
+ withNoSchema(MarshallerDataBagItem.class);
// [12400 - 12500]: Encryption messages.
msgIdx = 12400;
@@ -652,6 +661,7 @@ public class CoreMessagesProvider extends
AbstractMarshallableMessageFactoryProv
withNoSchema(MasterKeyChangeRequest.class);
withNoSchema(GroupKeyEncrypted.class);
withNoSchema(NodeEncryptionKeys.class);
+ withNoSchema(EncryptionDataBagItem.class);
// [13000 - 13300]: Control, configuration, diagnostincs and other
messages.
msgIdx = 13000;
@@ -665,6 +675,10 @@ public class CoreMessagesProvider extends
AbstractMarshallableMessageFactoryProv
withNoSchemaResolvedClassLoader(DynamicCacheChangeRequest.class);
withNoSchema(PartitionHashRecord.class);
withNoSchema(TransactionsHashRecord.class);
+ withNoSchema(ClusterIdAndTag.class);
+ withNoSchema(ClusterUpdateNotifierDataBagItem.class);
+ withNoSchemaResolvedClassLoader(PluginsDataBagItem.class);
+ withSchema(EventsDataBagItem.class);
// [13400 - 13600]: Operation context messages.
msgIdx = 13400;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
index f0e1a7c7b62..56795c7ce11 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java
@@ -17,10 +17,10 @@
package org.apache.ignite.internal;
-import java.io.Serializable;
-import java.util.Map;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.plugin.PluginsDataBagItem;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.plugin.PluginProvider;
import org.apache.ignite.plugin.PluginValidationException;
@@ -115,10 +115,10 @@ public class GridPluginComponent implements GridComponent
{
@Nullable @Override public IgniteNodeValidationResult
validateNode(ClusterNode node,
JoiningNodeDiscoveryData discoData) {
try {
- Map<String, Serializable> map = discoData.joiningNodeData();
+ PluginsDataBagItem pluginsItem = discoData.joiningNodeData();
- if (map != null)
- plugin.validateNewNode(node, map.get(plugin.name()));
+ if (pluginsItem != null && !F.isEmpty(pluginsItem.data()))
+ plugin.validateNewNode(node,
pluginsItem.data().get(plugin.name()));
else
plugin.validateNewNode(node, null);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/InlineSizesData.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/EncryptionDataBagItem.java
similarity index 75%
copy from
modules/core/src/main/java/org/apache/ignite/internal/processors/query/InlineSizesData.java
copy to
modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/EncryptionDataBagItem.java
index eb3813501f6..e4ea26f2914 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/InlineSizesData.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/EncryptionDataBagItem.java
@@ -15,25 +15,23 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query;
+package org.apache.ignite.internal.managers.encryption;
import java.util.Map;
import org.apache.ignite.internal.Order;
import org.apache.ignite.plugin.extensions.communication.Message;
/** */
-public class InlineSizesData implements Message {
+public class EncryptionDataBagItem implements Message {
/** */
@Order(0)
- Map<String, Integer> sizes;
+ Map<Integer, GroupKeyEncrypted> knownKeys;
/** */
- public InlineSizesData() {}
+ public EncryptionDataBagItem() {}
- /**
- * @param sizes Inline sizes.
- */
- public InlineSizesData(Map<String, Integer> sizes) {
- this.sizes = sizes;
+ /** */
+ EncryptionDataBagItem(Map<Integer, GroupKeyEncrypted> knownKeys) {
+ this.knownKeys = knownKeys;
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java
index 19c58d5d7b8..80d6c11e98a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java
@@ -563,7 +563,7 @@ public class GridEncryptionManager extends
GridManagerAdapter<EncryptionSpi> imp
}
}
- dataBag.addGridCommonData(ENCRYPTION_MGR.ordinal(), knownEncKeys);
+ dataBag.addGridCommonData(ENCRYPTION_MGR.ordinal(), new
EncryptionDataBagItem(knownEncKeys));
}
/** {@inheritDoc} */
@@ -571,20 +571,15 @@ public class GridEncryptionManager extends
GridManagerAdapter<EncryptionSpi> imp
if (ctx.clientNode())
return;
- Map<Integer, Object> encKeysFromCluster = (Map<Integer,
Object>)data.commonData();
+ EncryptionDataBagItem encryptionItem = data.commonData();
- if (F.isEmpty(encKeysFromCluster))
+ if (encryptionItem == null || F.isEmpty(encryptionItem.knownKeys))
return;
- for (Map.Entry<Integer, Object> entry : encKeysFromCluster.entrySet())
{
+ for (Map.Entry<Integer, GroupKeyEncrypted> entry :
encryptionItem.knownKeys.entrySet()) {
int grpId = entry.getKey();
- GroupKeyEncrypted rmtKey;
-
- if (entry.getValue() instanceof GroupKeyEncrypted)
- rmtKey = (GroupKeyEncrypted)entry.getValue();
- else
- rmtKey = new GroupKeyEncrypted(INITIAL_KEY_ID,
(byte[])entry.getValue());
+ GroupKeyEncrypted rmtKey = entry.getValue();
GroupKey locGrpKey = getActiveKey(grpId);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/InlineSizesData.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/EventsDataBagItem.java
similarity index 74%
copy from
modules/core/src/main/java/org/apache/ignite/internal/processors/query/InlineSizesData.java
copy to
modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/EventsDataBagItem.java
index eb3813501f6..da409dc0d58 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/InlineSizesData.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/EventsDataBagItem.java
@@ -15,25 +15,22 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query;
+package org.apache.ignite.internal.managers.eventstorage;
-import java.util.Map;
import org.apache.ignite.internal.Order;
import org.apache.ignite.plugin.extensions.communication.Message;
/** */
-public class InlineSizesData implements Message {
+public class EventsDataBagItem implements Message {
/** */
@Order(0)
- Map<String, Integer> sizes;
+ int[] enabledEvts;
/** */
- public InlineSizesData() {}
+ public EventsDataBagItem() { }
- /**
- * @param sizes Inline sizes.
- */
- public InlineSizesData(Map<String, Integer> sizes) {
- this.sizes = sizes;
+ /** @param enabledEvts Enabled events. */
+ public EventsDataBagItem(int[] enabledEvts) {
+ this.enabledEvts = enabledEvts;
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
index 1f96aa0a316..618ad6bb05e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
@@ -1173,13 +1173,15 @@ public class GridEventStorageManager extends
GridManagerAdapter<EventStorageSpi>
/** {@inheritDoc} */
@Override public void
onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) {
- if (data.commonData() == null)
+ EventsDataBagItem evtsItem = data.commonData();
+
+ if (evtsItem == null)
return;
if (ctx.clientNode())
return;
- GridIntList clusterData = new GridIntList((int[])data.commonData());
+ GridIntList clusterData = new GridIntList(evtsItem.enabledEvts);
GridIntList nodeData = new GridIntList(enabledEvents());
GridIntList toEnable = new GridIntList(clusterData.size());
@@ -1207,9 +1209,7 @@ public class GridEventStorageManager extends
GridManagerAdapter<EventStorageSpi>
if (dataBag.isJoiningNodeClient() &&
dataBag.commonDataCollectedFor(EVENT_MGR.ordinal()))
return;
- int[] clusterData = enabledEvents();
-
- dataBag.addGridCommonData(EVENT_MGR.ordinal(), clusterData);
+ dataBag.addGridCommonData(EVENT_MGR.ordinal(), new
EventsDataBagItem(enabledEvents()));
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/AuthentificationDataBagItem.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/AuthentificationDataBagItem.java
new file mode 100644
index 00000000000..2ce6d24940b
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/AuthentificationDataBagItem.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.authentication;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+
+/** Initial data is collected on coordinator to send to join node. */
+public class AuthentificationDataBagItem implements Message {
+ /** Users. */
+ @GridToStringInclude
+ @Order(0)
+ ArrayList<User> usrs;
+
+ /** Active user operations. */
+ @GridToStringInclude
+ @Order(1)
+ ArrayList<UserManagementOperation> activeOps;
+
+ /** */
+ public AuthentificationDataBagItem() { }
+
+ /**
+ * @param usrs Users.
+ * @param ops Active operations on cluster.
+ */
+ AuthentificationDataBagItem(Collection<User> usrs,
Collection<UserManagementOperation> ops) {
+ this.usrs = new ArrayList<>(usrs);
+ activeOps = new ArrayList<>(ops);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(AuthentificationDataBagItem.class, this);
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java
index af9baf67bf3..55a5c22f2a8 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java
@@ -60,7 +60,6 @@ import
org.apache.ignite.internal.processors.security.SecurityContext;
import org.apache.ignite.internal.thread.pool.IgniteThreadPoolExecutor;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -141,7 +140,7 @@ public class IgniteAuthenticationProcessor extends
GridProcessorAdapter implemen
private UserManagementOperationFinishedMessage curOpFinishMsg;
/** Initial users map and operations received from coordinator on the node
joined to the cluster. */
- private InitialUsersData initUsrs;
+ private AuthentificationDataBagItem initUsrs;
/** I/O message listener. */
private GridMessageListener ioLsnr;
@@ -403,7 +402,7 @@ public class IgniteAuthenticationProcessor extends
GridProcessorAdapter implemen
synchronized (mux) {
if (!dataBag.commonDataCollectedFor(AUTH_PROC.ordinal())) {
- InitialUsersData d = new InitialUsersData(users.values(),
activeOps.values());
+ AuthentificationDataBagItem d = new
AuthentificationDataBagItem(users.values(), activeOps.values());
if (log.isDebugEnabled())
log.debug("Collected initial users data: " + d);
@@ -430,7 +429,7 @@ public class IgniteAuthenticationProcessor extends
GridProcessorAdapter implemen
/** {@inheritDoc} */
@Override public void
onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) {
- initUsrs = (InitialUsersData)data.commonData();
+ initUsrs = data.commonData();
}
/** {@inheritDoc} */
@@ -999,36 +998,6 @@ public class IgniteAuthenticationProcessor extends
GridProcessorAdapter implemen
}
/**
- * Initial data is collected on coordinator to send to join node.
- */
- private static final class InitialUsersData implements Serializable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Users. */
- @GridToStringInclude
- private final ArrayList<User> usrs;
-
- /** Active user operations. */
- @GridToStringInclude
- private final ArrayList<UserManagementOperation> activeOps;
-
- /**
- * @param usrs Users.
- * @param ops Active operations on cluster.
- */
- InitialUsersData(Collection<User> usrs,
Collection<UserManagementOperation> ops) {
- this.usrs = new ArrayList<>(usrs);
- activeOps = new ArrayList<>(ops);
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(InitialUsersData.class, this);
- }
- }
-
- /**i
*
*/
private final class UserProposedListener implements
CustomEventListener<UserProposedMessage> {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/UserManagementOperation.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/UserManagementOperation.java
index 82d3b939bf5..da6efb8ad9e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/UserManagementOperation.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/UserManagementOperation.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.processors.authentication;
-import java.io.Serializable;
import java.util.Objects;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -27,10 +26,7 @@ import
org.apache.ignite.plugin.extensions.communication.Message;
/**
* The operation with users. Used to deliver the information about requested
operation to all server nodes.
*/
-public class UserManagementOperation implements Serializable, Message {
- /** */
- private static final long serialVersionUID = 0L;
-
+public class UserManagementOperation implements Message {
/** User. */
@Order(0)
User usr;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 987be04ea2f..d5c7d8dce25 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -1504,10 +1504,9 @@ public class ClusterCachesInfo {
if (data.commonData() == null)
return;
- assert joinDiscoData != null || disconnectedState();
- assert data.commonData() instanceof CacheNodeCommonDiscoveryData :
data;
+ CacheNodeCommonDiscoveryData cachesData = data.commonData();
- CacheNodeCommonDiscoveryData cachesData =
(CacheNodeCommonDiscoveryData)data.commonData();
+ assert joinDiscoData != null || disconnectedState();
// CacheGroup configurations that were created from local node
configuration.
Map<Integer, CacheGroupDescriptor> locCacheGrps = new
HashMap<>(registeredCacheGroups());
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataVersionsData.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheBinaryDataBagItem.java
similarity index 77%
copy from
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataVersionsData.java
copy to
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheBinaryDataBagItem.java
index 706d2c26f1c..5c70e5439b7 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataVersionsData.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheBinaryDataBagItem.java
@@ -22,18 +22,18 @@ import org.apache.ignite.internal.Order;
import org.apache.ignite.plugin.extensions.communication.Message;
/** */
-public class BinaryMetadataVersionsData implements Message {
+public class CacheBinaryDataBagItem implements Message {
/** */
@Order(0)
- Map<Integer, BinaryMetadataVersionInfo> data;
+ Map<Integer, BinaryMetadataVersionInfo> meta;
/** */
- public BinaryMetadataVersionsData() {}
+ public CacheBinaryDataBagItem() {}
/**
- * @param data Data.
+ * @param meta Per-type binary metadata info.
*/
- public BinaryMetadataVersionsData(Map<Integer, BinaryMetadataVersionInfo>
data) {
- this.data = Map.copyOf(data);
+ public CacheBinaryDataBagItem(Map<Integer, BinaryMetadataVersionInfo>
meta) {
+ this.meta = Map.copyOf(meta);
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index 8867e576ec8..2913263ed9d 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.cache.binary;
import java.io.File;
-import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
@@ -1418,11 +1417,11 @@ public class CacheObjectBinaryProcessorImpl extends
GridProcessorAdapter impleme
}
/** */
- private IgniteNodeValidationResult validateBinaryMetadata(UUID rmtNodeId,
BinaryMetadataVersionsData newNodeMeta) {
- if (newNodeMeta == null)
+ private IgniteNodeValidationResult validateBinaryMetadata(UUID rmtNodeId,
CacheBinaryDataBagItem cacheBinaryItem) {
+ if (cacheBinaryItem == null)
return null;
- for (Map.Entry<Integer, BinaryMetadataVersionInfo> metaEntry :
newNodeMeta.data.entrySet()) {
+ for (Map.Entry<Integer, BinaryMetadataVersionInfo> metaEntry :
cacheBinaryItem.meta.entrySet()) {
if (!metadataLocCache.containsKey(metaEntry.getKey()))
continue;
@@ -1464,25 +1463,25 @@ public class CacheObjectBinaryProcessorImpl extends
GridProcessorAdapter impleme
res.put(e.getKey(), e.getValue());
}
- dataBag.addGridCommonData(BINARY_PROC.ordinal(),
(Serializable)res);
+ dataBag.addGridCommonData(BINARY_PROC.ordinal(), new
CacheBinaryDataBagItem(res));
}
}
/** {@inheritDoc} */
@Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) {
- dataBag.addJoiningNodeData(BINARY_PROC.ordinal(), new
BinaryMetadataVersionsData(metadataLocCache));
+ dataBag.addJoiningNodeData(BINARY_PROC.ordinal(), new
CacheBinaryDataBagItem(metadataLocCache));
}
/** {@inheritDoc} */
@Override public void
onJoiningNodeDataReceived(DiscoveryDataBag.JoiningNodeDiscoveryData data) {
- BinaryMetadataVersionsData newNodeMeta = data.joiningNodeData();
+ CacheBinaryDataBagItem cacheBinaryItem = data.joiningNodeData();
- if (newNodeMeta == null)
+ if (cacheBinaryItem == null)
return;
UUID joiningNode = data.joiningNodeId();
- for (Map.Entry<Integer, BinaryMetadataVersionInfo> metaEntry :
newNodeMeta.data.entrySet()) {
+ for (Map.Entry<Integer, BinaryMetadataVersionInfo> metaEntry :
cacheBinaryItem.meta.entrySet()) {
if (metadataLocCache.containsKey(metaEntry.getKey())) {
BinaryMetadataVersionInfo locMetaVerInfo =
metadataLocCache.get(metaEntry.getKey());
@@ -1530,10 +1529,10 @@ public class CacheObjectBinaryProcessorImpl extends
GridProcessorAdapter impleme
/** {@inheritDoc} */
@Override public void onGridDataReceived(GridDiscoveryData data) {
- Map<Integer, BinaryMetadataVersionInfo> receivedData = (Map<Integer,
BinaryMetadataVersionInfo>)data.commonData();
+ CacheBinaryDataBagItem cacheBinaryItem = data.commonData();
- if (receivedData != null) {
- for (Map.Entry<Integer, BinaryMetadataVersionInfo> e :
receivedData.entrySet()) {
+ if (cacheBinaryItem != null && !F.isEmpty(cacheBinaryItem.meta)) {
+ for (Map.Entry<Integer, BinaryMetadataVersionInfo> e :
cacheBinaryItem.meta.entrySet()) {
BinaryMetadataVersionInfo metaVerInfo = e.getValue();
BinaryMetadataVersionInfo locMetaVerInfo = new
BinaryMetadataVersionInfo(metaVerInfo.metadata(),
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterIdAndTag.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterIdAndTag.java
index 2b03f36377d..17c49f2f07e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterIdAndTag.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterIdAndTag.java
@@ -20,20 +20,27 @@ package org.apache.ignite.internal.processors.cluster;
import java.io.Serializable;
import java.util.Objects;
import java.util.UUID;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
/**
* Container class to send cluster ID and tag in disco data and to write them
atomically to metastorage.
*/
-public class ClusterIdAndTag implements Serializable {
+public class ClusterIdAndTag implements Serializable, Message {
/** */
private static final long serialVersionUID = 0L;
/** */
- private final UUID id;
+ @Order(0)
+ UUID id;
/** */
- private final String tag;
+ @Order(1)
+ String tag;
+
+ /** */
+ public ClusterIdAndTag() { }
/**
* @param id Cluster ID.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
index b7d78a906a5..3b75f2baf09 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cluster;
import java.io.Serializable;
import java.util.Collection;
-import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
@@ -76,6 +75,7 @@ import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.metric.MetricRegistry;
import org.apache.ignite.mxbean.IgniteClusterMXBean;
+import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider;
@@ -100,9 +100,6 @@ import static
org.apache.ignite.internal.util.lang.ClusterNodeFunc.nodeConsisten
*
*/
public class ClusterProcessor extends GridProcessorAdapter implements
DistributedMetastorageLifecycleListener {
- /** */
- private static final String ATTR_UPDATE_NOTIFIER_STATUS =
"UPDATE_NOTIFIER_STATUS";
-
/** */
private static final String CLUSTER_ID_TAG_KEY =
DistributedMetaStorage.IGNITE_INTERNAL_KEY_PREFIX + "cluster.id.tag";
@@ -465,42 +462,31 @@ public class ClusterProcessor extends
GridProcessorAdapter implements Distribute
/** {@inheritDoc} */
@Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) {
- dataBag.addJoiningNodeData(CLUSTER_PROC.ordinal(), getDiscoveryData());
+ dataBag.addJoiningNodeData(CLUSTER_PROC.ordinal(), new
ClusterUpdateNotifierDataBagItem(notifyEnabled.get()));
}
/** {@inheritDoc} */
@Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
- dataBag.addNodeSpecificData(CLUSTER_PROC.ordinal(),
getDiscoveryData());
-
- dataBag.addGridCommonData(CLUSTER_PROC.ordinal(), new
ClusterIdAndTag(cluster.id(), cluster.tag()));
- }
-
- /**
- * @return Discovery data.
- */
- private Serializable getDiscoveryData() {
- HashMap<String, Object> map = new HashMap<>(2);
-
- map.put(ATTR_UPDATE_NOTIFIER_STATUS, notifyEnabled.get());
+ dataBag.addNodeSpecificData(CLUSTER_PROC.ordinal(), new
ClusterUpdateNotifierDataBagItem(notifyEnabled.get()));
- return map;
+ dataBag.addGridCommonData(CLUSTER_PROC.ordinal(), (Message)new
ClusterIdAndTag(cluster.id(), cluster.tag()));
}
/** {@inheritDoc} */
@Override public void onGridDataReceived(GridDiscoveryData data) {
- Map<UUID, Serializable> nodeSpecData = data.nodeSpecificData();
+ Map<UUID, ClusterUpdateNotifierDataBagItem> updateNotifierData =
data.nodeSpecificData();
- if (nodeSpecData != null) {
- Boolean lstFlag = findLastFlag(nodeSpecData.values());
+ if (updateNotifierData != null) {
+ Boolean updateNotifierEnabled =
findLastUpdateNotifierStatus(updateNotifierData.values());
- if (lstFlag != null)
- notifyEnabled.set(lstFlag);
+ if (updateNotifierEnabled != null)
+ notifyEnabled.set(updateNotifierEnabled);
}
- ClusterIdAndTag commonData = (ClusterIdAndTag)data.commonData();
+ ClusterIdAndTag idAndTag = data.commonData();
- if (commonData != null) {
- Serializable remoteClusterId = commonData.id();
+ if (idAndTag != null) {
+ UUID remoteClusterId = idAndTag.id();
if (remoteClusterId != null) {
if (locClusterId != null &&
!locClusterId.equals(remoteClusterId)) {
@@ -510,10 +496,10 @@ public class ClusterProcessor extends
GridProcessorAdapter implements Distribute
", local cluster ID: " + locClusterId);
}
- locClusterId = (UUID)remoteClusterId;
+ locClusterId = remoteClusterId;
}
- String remoteClusterTag = commonData.tag();
+ String remoteClusterTag = idAndTag.tag();
if (remoteClusterTag != null)
locClusterTag = remoteClusterTag;
@@ -521,21 +507,17 @@ public class ClusterProcessor extends
GridProcessorAdapter implements Distribute
}
/**
- * @param vals collection to seek through.
+ * @param notifierItems Collection of update notifiers statuses to seek
through.
*/
- private Boolean findLastFlag(Collection<Serializable> vals) {
- Boolean flag = null;
-
- for (Serializable ser : vals) {
- if (ser != null) {
- Map<String, Object> map = (Map<String, Object>)ser;
+ private Boolean
findLastUpdateNotifierStatus(Collection<ClusterUpdateNotifierDataBagItem>
notifierItems) {
+ Boolean updateNotifierEnabled = null;
- if (map.containsKey(ATTR_UPDATE_NOTIFIER_STATUS))
- flag = (Boolean)map.get(ATTR_UPDATE_NOTIFIER_STATUS);
- }
+ for (ClusterUpdateNotifierDataBagItem notifierItem : notifierItems) {
+ if (notifierItem != null)
+ updateNotifierEnabled = notifierItem.notifierEnabled;
}
- return flag;
+ return updateNotifierEnabled;
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataVersionsData.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterUpdateNotifierDataBagItem.java
similarity index 69%
rename from
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataVersionsData.java
rename to
modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterUpdateNotifierDataBagItem.java
index 706d2c26f1c..a33ff48ba4e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataVersionsData.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterUpdateNotifierDataBagItem.java
@@ -15,25 +15,22 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.cache.binary;
+package org.apache.ignite.internal.processors.cluster;
-import java.util.Map;
import org.apache.ignite.internal.Order;
import org.apache.ignite.plugin.extensions.communication.Message;
/** */
-public class BinaryMetadataVersionsData implements Message {
- /** */
+public class ClusterUpdateNotifierDataBagItem implements Message {
+ /** Update notifier enabled status. */
@Order(0)
- Map<Integer, BinaryMetadataVersionInfo> data;
+ boolean notifierEnabled;
/** */
- public BinaryMetadataVersionsData() {}
+ public ClusterUpdateNotifierDataBagItem() { }
- /**
- * @param data Data.
- */
- public BinaryMetadataVersionsData(Map<Integer, BinaryMetadataVersionInfo>
data) {
- this.data = Map.copyOf(data);
+ /** @param notifierEnabled Update notifier enabled status. */
+ public ClusterUpdateNotifierDataBagItem(boolean notifierEnabled) {
+ this.notifierEnabled = notifierEnabled;
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index 337b4f51a70..3d9d20faf53 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -965,14 +965,16 @@ public class GridClusterStateProcessor extends
GridProcessorAdapter implements I
/** {@inheritDoc} */
@Override public void
onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) {
- if (data.commonData() instanceof DiscoveryDataClusterState) {
+ Serializable commonData = data.commonData();
+
+ if (commonData instanceof DiscoveryDataClusterState) {
if (globalState != null && globalState.baselineTopology() != null)
//node with BaselineTopology is not allowed to join mixed
cluster
// (where some nodes don't support BaselineTopology)
throw new IgniteException("Node with BaselineTopology cannot
join" +
" mixed cluster running in compatibility mode");
- globalState = (DiscoveryDataClusterState)data.commonData();
+ globalState = (DiscoveryDataClusterState)commonData;
compatibilityMode = true;
@@ -981,7 +983,7 @@ public class GridClusterStateProcessor extends
GridProcessorAdapter implements I
return;
}
- BaselineStateAndHistoryData stateDiscoData =
(BaselineStateAndHistoryData)data.commonData();
+ BaselineStateAndHistoryData stateDiscoData =
(BaselineStateAndHistoryData)commonData;
if (stateDiscoData != null) {
DiscoveryDataClusterState state = stateDiscoData.globalState;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 489f13e71fe..7291e631c2f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -528,8 +528,7 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
@Override public void onGridDataReceived(GridDiscoveryData data) {
if (immutableDiscoCustomMsg) {
if (data.commonData() != null) {
- ContinuousRoutinesCommonDiscoveryData commonData =
- (ContinuousRoutinesCommonDiscoveryData)data.commonData();
+ ContinuousRoutinesCommonDiscoveryData commonData =
data.commonData();
for (ContinuousRoutineInfo routineInfo :
commonData.startedRoutines) {
if (routinesInfo.routineExists(routineInfo.routineId))
@@ -542,11 +541,11 @@ public class GridContinuousProcessor extends
GridProcessorAdapter {
}
}
else {
- Map<UUID, Serializable> nodeSpecData = data.nodeSpecificData();
+ Map<UUID, DiscoveryData> nodeSpecData = data.nodeSpecificData();
if (nodeSpecData != null) {
- for (Map.Entry<UUID, Serializable> e : nodeSpecData.entrySet())
-
onDiscoveryDataReceivedMutable((DiscoveryData)e.getValue());
+ for (DiscoveryData val : nodeSpecData.values())
+ onDiscoveryDataReceivedMutable(val);
}
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
index 8946672364e..374ea7fe518 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
@@ -326,34 +326,32 @@ public class GridMarshallerMappingProcessor extends
GridProcessorAdapter {
/** {@inheritDoc} */
@Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) {
- dataBag.addJoiningNodeData(MARSHALLER_PROC.ordinal(), new
MarshallerMappingsData(marshallerCtx.getCachedMappings()));
+ dataBag.addJoiningNodeData(MARSHALLER_PROC.ordinal(), new
MarshallerDataBagItem(marshallerCtx.getCachedMappings()));
}
/** {@inheritDoc} */
@Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
if (!dataBag.commonDataCollectedFor(MARSHALLER_PROC.ordinal()))
- dataBag.addGridCommonData(MARSHALLER_PROC.ordinal(),
marshallerCtx.getCachedMappings());
+ dataBag.addGridCommonData(MARSHALLER_PROC.ordinal(),
+ new MarshallerDataBagItem(marshallerCtx.getCachedMappings()));
}
/** {@inheritDoc} */
@Override public void
onJoiningNodeDataReceived(DiscoveryDataBag.JoiningNodeDiscoveryData data) {
- MarshallerMappingsData mappingsData = data.joiningNodeData();
-
- processIncomingMappings(mappingsData.mappings);
+ processIncomingMappings(data.joiningNodeData());
}
/** {@inheritDoc} */
@Override public void onGridDataReceived(GridDiscoveryData data) {
- List<Map<Integer, MappedName>> mappings = (List<Map<Integer,
MappedName>>)data.commonData();
-
- processIncomingMappings(mappings);
+ processIncomingMappings(data.commonData());
}
/**
- * @param mappings Incoming marshaller mappings.
+ * @param marshallerItem Incoming marshaller mappings wrapper.
*/
- private void processIncomingMappings(List<Map<Integer, MappedName>>
mappings) {
- marshallerCtx.onMappingDataReceived(log, mappings);
+ private void processIncomingMappings(@Nullable MarshallerDataBagItem
marshallerItem) {
+ if (marshallerItem != null)
+ marshallerCtx.onMappingDataReceived(log,
marshallerItem.mappings());
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingsData.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerDataBagItem.java
similarity index 78%
rename from
modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingsData.java
rename to
modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerDataBagItem.java
index 2207b1c21f4..9b9a0d8b21c 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingsData.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerDataBagItem.java
@@ -23,18 +23,21 @@ import org.apache.ignite.internal.Order;
import org.apache.ignite.plugin.extensions.communication.Message;
/** */
-public class MarshallerMappingsData implements Message {
+public class MarshallerDataBagItem implements Message {
/** */
@Order(0)
List<Map<Integer, MappedName>> mappings;
/** */
- public MarshallerMappingsData() {}
+ public MarshallerDataBagItem() {}
- /**
- * @param mappings Mappings.
- */
- public MarshallerMappingsData(List<Map<Integer, MappedName>> mappings) {
+ /** @param mappings Mappings. */
+ public MarshallerDataBagItem(List<Map<Integer, MappedName>> mappings) {
this.mappings = mappings;
}
+
+ /** @return Mappings. */
+ public List<Map<Integer, MappedName>> mappings() {
+ return mappings;
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java
index 1362fec2e7f..ad9022f2e97 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java
@@ -958,7 +958,7 @@ public class DistributedMetaStorageImpl extends
GridProcessorAdapter
lock.writeLock().lock();
try {
- DistributedMetaStorageClusterNodeData nodeData =
(DistributedMetaStorageClusterNodeData)data.commonData();
+ DistributedMetaStorageClusterNodeData nodeData = data.commonData();
if (nodeData != null) {
if (nodeData.fullData != null) {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java
index 804f5bf743a..993db181dbc 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java
@@ -166,24 +166,24 @@ public class IgnitePluginProcessor extends
GridProcessorAdapter {
/** {@inheritDoc} */
@Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) {
- Serializable pluginsData = getDiscoveryData(dataBag.joiningNodeId());
+ PluginsDataBagItem pluginsItem =
itemForDataBag(dataBag.joiningNodeId());
- if (pluginsData != null)
- dataBag.addJoiningNodeData(PLUGIN.ordinal(), pluginsData);
+ if (!F.isEmpty(pluginsItem.data))
+ dataBag.addJoiningNodeData(PLUGIN.ordinal(), pluginsItem);
}
/** {@inheritDoc} */
@Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
- Serializable pluginsData = getDiscoveryData(dataBag.joiningNodeId());
+ PluginsDataBagItem pluginsItem =
itemForDataBag(dataBag.joiningNodeId());
- if (pluginsData != null)
- dataBag.addNodeSpecificData(PLUGIN.ordinal(), pluginsData);
+ if (!F.isEmpty(pluginsItem.data))
+ dataBag.addNodeSpecificData(PLUGIN.ordinal(), pluginsItem);
}
/**
* @param joiningNodeId Joining node id.
*/
- private Serializable getDiscoveryData(UUID joiningNodeId) {
+ private PluginsDataBagItem itemForDataBag(UUID joiningNodeId) {
HashMap<String, Serializable> pluginsData = null;
for (Map.Entry<String, PluginProvider> e : plugins.entrySet()) {
@@ -197,31 +197,27 @@ public class IgnitePluginProcessor extends
GridProcessorAdapter {
}
}
- return pluginsData;
+ return new PluginsDataBagItem(pluginsData);
}
/** {@inheritDoc} */
@Override public void onJoiningNodeDataReceived(JoiningNodeDiscoveryData
data) {
- if (data.hasJoiningNodeData()) {
- Map<String, Serializable> pluginsData = data.joiningNodeData();
+ PluginsDataBagItem pluginsItem = data.joiningNodeData();
- applyPluginsData(data.joiningNodeId(), pluginsData);
- }
+ if (pluginsItem != null && !F.isEmpty(pluginsItem.data))
+ applyPluginsData(data.joiningNodeId(), pluginsItem.data);
}
/** {@inheritDoc} */
@Override public void onGridDataReceived(GridDiscoveryData data) {
- Map<UUID, Serializable> nodeSpecificData = data.nodeSpecificData();
+ Map<UUID, PluginsDataBagItem> nodeSpecificData =
data.nodeSpecificData();
if (nodeSpecificData != null) {
UUID joiningNodeId = data.joiningNodeId();
- for (Serializable v : nodeSpecificData.values()) {
- if (v != null) {
- Map<String, Serializable> pluginsData = (Map<String,
Serializable>)v;
-
- applyPluginsData(joiningNodeId, pluginsData);
- }
+ for (PluginsDataBagItem pluginsItem : nodeSpecificData.values()) {
+ if (pluginsItem != null && !F.isEmpty(pluginsItem.data))
+ applyPluginsData(joiningNodeId, pluginsItem.data);
}
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/ObjectData.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/PluginsDataBagItem.java
similarity index 58%
copy from
modules/core/src/main/java/org/apache/ignite/spi/discovery/ObjectData.java
copy to
modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/PluginsDataBagItem.java
index f9da59bffe4..9b70e422237 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/ObjectData.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/PluginsDataBagItem.java
@@ -15,68 +15,49 @@
* limitations under the License.
*/
-package org.apache.ignite.spi.discovery;
+package org.apache.ignite.internal.processors.plugin;
import java.io.Serializable;
+import java.util.Map;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.MarshallableMessage;
import org.apache.ignite.internal.Order;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
-import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.Nullable;
-/** Wrapper message for serializable data. */
-public class ObjectData implements MarshallableMessage {
- /** */
- @GridToStringInclude
- private Serializable data;
+/** */
+public class PluginsDataBagItem implements MarshallableMessage {
+ /** Original plugins data. */
+ @Nullable Map<String, Serializable> data;
- /** */
- @GridToStringExclude
+ /** Serialized plugins data. */
@Order(0)
- byte[] dataBytes;
+ @Nullable byte[] dataBytes;
/** */
- public ObjectData() {}
+ public PluginsDataBagItem() { }
- /**
- * @param data Original data.
- */
- public ObjectData(Serializable data) {
+ /** @param data Plugins data. */
+ public PluginsDataBagItem(@Nullable Map<String, Serializable> data) {
this.data = data;
}
/** {@inheritDoc} */
@Override public void prepareMarshal(Marshaller marsh) throws
IgniteCheckedException {
- if (data != null)
+ if (!F.isEmpty(data))
dataBytes = U.marshal(marsh, data);
}
/** {@inheritDoc} */
@Override public void finishUnmarshal(Marshaller marsh, ClassLoader
clsLdr) throws IgniteCheckedException {
- if (dataBytes != null) {
+ if (dataBytes != null)
data = U.unmarshal(marsh, dataBytes, clsLdr);
-
- dataBytes = null;
- }
}
- /**
- * @param msg Message.
- * @param <T> Type of data.
- *
- * @return Original data unwrapped from a message.
- */
- public static <T> T unwrap(@Nullable Message msg) {
- return msg != null ? (T)(((ObjectData)msg).data) : null;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(ObjectData.class, this);
+ /** @return Original plugins data. */
+ public @Nullable Map<String, Serializable> data() {
+ return data;
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 854e8a17e58..97cde3ad4ef 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.processors.query;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -118,6 +117,8 @@ import
org.apache.ignite.internal.processors.query.schema.SchemaOperationManager
import
org.apache.ignite.internal.processors.query.schema.SchemaOperationWorker;
import org.apache.ignite.internal.processors.query.schema.SchemaSqlViewManager;
import
org.apache.ignite.internal.processors.query.schema.management.SchemaManager;
+import
org.apache.ignite.internal.processors.query.schema.message.QueryInlineSizesDataBagItem;
+import
org.apache.ignite.internal.processors.query.schema.message.QueryProposalsDataBagItem;
import
org.apache.ignite.internal.processors.query.schema.message.SchemaAbstractDiscoveryMessage;
import
org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDiscoveryMessage;
import
org.apache.ignite.internal.processors.query.schema.message.SchemaOperationStatusMessage;
@@ -181,9 +182,6 @@ import static
org.apache.ignite.internal.processors.query.schema.SchemaOperation
*/
@SuppressWarnings("rawtypes")
public class GridQueryProcessor extends GridProcessorAdapter {
- /** */
- private static final String INLINE_SIZES_DISCO_BAG_KEY = "inline_sizes";
-
/** Warn message if some indexes have different inline sizes on the nodes.
*/
public static final String INLINE_SIZES_DIFFER_WARN_MSG_FORMAT = "Inline
sizes on local node and node %s are different. " +
"Please drop and create again these indexes to avoid performance
problems with SQL queries. Problem indexes: %s";
@@ -473,71 +471,60 @@ public class GridQueryProcessor extends
GridProcessorAdapter {
/** {@inheritDoc} */
@Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
- LinkedHashMap<UUID, SchemaProposeDiscoveryMessage> proposals;
+ QueryProposalsDataBagItem proposalsItem;
// Collect active proposals.
synchronized (stateMux) {
- proposals = new LinkedHashMap<>(activeProposals);
+ proposalsItem = new QueryProposalsDataBagItem(new
LinkedHashMap<>(activeProposals));
}
-
dataBag.addGridCommonData(DiscoveryDataExchangeType.QUERY_PROC.ordinal(),
proposals);
+
dataBag.addGridCommonData(DiscoveryDataExchangeType.QUERY_PROC.ordinal(),
proposalsItem);
// We should send inline index sizes information only to server nodes.
if (!dataBag.isJoiningNodeClient()) {
- HashMap<String, Serializable> nodeSpecificMap = new HashMap<>();
-
- Serializable oldVal =
nodeSpecificMap.put(INLINE_SIZES_DISCO_BAG_KEY,
collectSecondaryIndexesInlineSize());
-
- assert oldVal == null : oldVal;
-
-
dataBag.addNodeSpecificData(DiscoveryDataExchangeType.QUERY_PROC.ordinal(),
nodeSpecificMap);
+
dataBag.addNodeSpecificData(DiscoveryDataExchangeType.QUERY_PROC.ordinal(),
+ new QueryInlineSizesDataBagItem(secondaryIndexesInlineSize()));
}
}
/** {@inheritDoc} */
@Override public void
onJoiningNodeDataReceived(DiscoveryDataBag.JoiningNodeDiscoveryData data) {
- Object joiningNodeData = data.joiningNodeData();
-
- if (joiningNodeData instanceof InlineSizesData) {
- Map<String, Integer> joiningNodeIndexesInlineSize =
((InlineSizesData)joiningNodeData).sizes;
+ QueryInlineSizesDataBagItem inlineSizesItem = data.joiningNodeData();
- checkInlineSizes(secondaryIndexesInlineSize(),
joiningNodeIndexesInlineSize, data.joiningNodeId());
- }
+ if (inlineSizesItem != null)
+ checkInlineSizes(secondaryIndexesInlineSize(),
inlineSizesItem.sizes(), data.joiningNodeId());
}
/** {@inheritDoc} */
@Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) {
dataBag.addJoiningNodeData(DiscoveryDataExchangeType.QUERY_PROC.ordinal(),
- new InlineSizesData(secondaryIndexesInlineSize()));
+ new QueryInlineSizesDataBagItem(secondaryIndexesInlineSize()));
}
/** {@inheritDoc} */
@Override public void
onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) {
// Preserve proposals.
- LinkedHashMap<UUID, SchemaProposeDiscoveryMessage> activeProposals =
- (LinkedHashMap<UUID,
SchemaProposeDiscoveryMessage>)data.commonData();
+ QueryProposalsDataBagItem proposalsItem = data.commonData();
// Process proposals as if they were received as regular discovery
messages.
- if (!F.isEmpty(activeProposals)) {
+ if (proposalsItem != null &&
!F.isEmpty(proposalsItem.activeProposals())) {
synchronized (stateMux) {
- for (SchemaProposeDiscoveryMessage activeProposal :
activeProposals.values())
+ for (SchemaProposeDiscoveryMessage activeProposal :
proposalsItem.activeProposals().values())
onSchemaProposeDiscovery0(activeProposal);
}
}
- if (!F.isEmpty(data.nodeSpecificData())) {
+ Map<UUID, QueryInlineSizesDataBagItem> nodedSpecificData =
data.nodeSpecificData();
+
+ if (!F.isEmpty(nodedSpecificData)) {
Map<String, Integer> indexesInlineSize =
secondaryIndexesInlineSize();
if (!F.isEmpty(indexesInlineSize)) {
- for (UUID nodeId : data.nodeSpecificData().keySet()) {
- Serializable serializable =
data.nodeSpecificData().get(nodeId);
+ for (UUID nodeId : nodedSpecificData.keySet()) {
+ QueryInlineSizesDataBagItem inlineSizesItem =
nodedSpecificData.get(nodeId);
- assert serializable instanceof Map : serializable;
-
- Map<String, Serializable> nodeSpecificData = (Map<String,
Serializable>)serializable;
-
- if
(nodeSpecificData.containsKey(INLINE_SIZES_DISCO_BAG_KEY))
- checkInlineSizes(indexesInlineSize, (Map<String,
Integer>)nodeSpecificData.get(INLINE_SIZES_DISCO_BAG_KEY), nodeId);
+ if (inlineSizesItem != null)
+ checkInlineSizes(indexesInlineSize,
inlineSizesItem.sizes(), nodeId);
}
}
}
@@ -685,16 +672,6 @@ public class GridQueryProcessor extends
GridProcessorAdapter {
}
}
- /**
- * @return Serializable information about secondary indexes inline size.
- * @see #secondaryIndexesInlineSize()
- */
- private Serializable collectSecondaryIndexesInlineSize() {
- Map<String, Integer> map = secondaryIndexesInlineSize();
-
- return map instanceof Serializable ? (Serializable)map : new
HashMap<>(map);
- }
-
/**
* Process schema propose message from discovery thread.
*
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/InlineSizesData.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/QueryInlineSizesDataBagItem.java
similarity index 73%
copy from
modules/core/src/main/java/org/apache/ignite/internal/processors/query/InlineSizesData.java
copy to
modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/QueryInlineSizesDataBagItem.java
index eb3813501f6..f7e03ddf390 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/InlineSizesData.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/QueryInlineSizesDataBagItem.java
@@ -15,25 +15,28 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query;
+package org.apache.ignite.internal.processors.query.schema.message;
import java.util.Map;
import org.apache.ignite.internal.Order;
import org.apache.ignite.plugin.extensions.communication.Message;
/** */
-public class InlineSizesData implements Message {
+public class QueryInlineSizesDataBagItem implements Message {
/** */
@Order(0)
Map<String, Integer> sizes;
/** */
- public InlineSizesData() {}
+ public QueryInlineSizesDataBagItem() {}
- /**
- * @param sizes Inline sizes.
- */
- public InlineSizesData(Map<String, Integer> sizes) {
+ /** @param sizes Inline sizes. */
+ public QueryInlineSizesDataBagItem(Map<String, Integer> sizes) {
this.sizes = sizes;
}
+
+ /** @return Inline sizes. */
+ public Map<String, Integer> sizes() {
+ return sizes;
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/InlineSizesData.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/QueryProposalsDataBagItem.java
similarity index 54%
rename from
modules/core/src/main/java/org/apache/ignite/internal/processors/query/InlineSizesData.java
rename to
modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/QueryProposalsDataBagItem.java
index eb3813501f6..65082c4ba41 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/InlineSizesData.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/QueryProposalsDataBagItem.java
@@ -15,25 +15,31 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query;
+package org.apache.ignite.internal.processors.query.schema.message;
-import java.util.Map;
+import java.util.LinkedHashMap;
+import java.util.UUID;
import org.apache.ignite.internal.Order;
import org.apache.ignite.plugin.extensions.communication.Message;
-/** */
-public class InlineSizesData implements Message {
- /** */
+/** Wrapper for active schema change propose discovery messages. */
+public class QueryProposalsDataBagItem implements Message {
+ /** Active proposals. */
@Order(0)
- Map<String, Integer> sizes;
+ LinkedHashMap<UUID, SchemaProposeDiscoveryMessage> activeProposals;
/** */
- public InlineSizesData() {}
+ public QueryProposalsDataBagItem() {
+ // No-op.
+ }
+
+ /** @param activeProposals Active proposals. */
+ public QueryProposalsDataBagItem(LinkedHashMap<UUID,
SchemaProposeDiscoveryMessage> activeProposals) {
+ this.activeProposals = activeProposals;
+ }
- /**
- * @param sizes Inline sizes.
- */
- public InlineSizesData(Map<String, Integer> sizes) {
- this.sizes = sizes;
+ /** @return Active proposals. */
+ public LinkedHashMap<UUID, SchemaProposeDiscoveryMessage>
activeProposals() {
+ return activeProposals;
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java
index 1b832abab0f..a7914327d60 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.processors.query.schema.message;
-import java.io.Serializable;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import
org.apache.ignite.internal.processors.query.schema.SchemaOperationException;
@@ -30,10 +29,7 @@ import org.jetbrains.annotations.Nullable;
/**
* Abstract discovery message for schema operations.
*/
-public abstract class SchemaAbstractDiscoveryMessage extends
DiscoveryCustomMessage implements Serializable {
- /** */
- private static final long serialVersionUID = 0L;
-
+public abstract class SchemaAbstractDiscoveryMessage extends
DiscoveryCustomMessage {
/** Operation. */
@GridToStringInclude
@Order(0)
@@ -41,11 +37,11 @@ public abstract class SchemaAbstractDiscoveryMessage
extends DiscoveryCustomMess
/** Error message. */
@Order(1)
- transient String errMsg;
+ String errMsg;
/** Error code. */
@Order(2)
- transient int errCode;
+ int errCode;
/** Error. */
SchemaOperationException err;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java
index 9ff555b729a..fc95da0c2a1 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java
@@ -27,9 +27,6 @@ import org.jetbrains.annotations.Nullable;
* Schema change finish discovery message.
*/
public class SchemaFinishDiscoveryMessage extends
SchemaAbstractDiscoveryMessage {
- /** */
- private static final long serialVersionUID = 0L;
-
/** No-op flag. */
@Order(0)
boolean nop;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java
index 1f761c1e81e..bef0b3b2e94 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java
@@ -28,9 +28,6 @@ import org.jetbrains.annotations.Nullable;
* Schema change propose discovery message.
*/
public class SchemaProposeDiscoveryMessage extends
SchemaAbstractDiscoveryMessage {
- /** */
- private static final long serialVersionUID = 0L;
-
/** Cache deployment ID. */
@Order(0)
IgniteUuid depId;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java
index e7f59549ea4..34118541ece 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java
@@ -389,7 +389,7 @@ public class IgniteServiceProcessor extends
GridProcessorAdapter implements Igni
if (data.commonData() == null)
return;
- ServiceProcessorCommonDiscoveryData clusterData =
(ServiceProcessorCommonDiscoveryData)data.commonData();
+ ServiceProcessorCommonDiscoveryData clusterData = data.commonData();
for (ServiceInfo desc : clusterData.registeredServices()) {
try {
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java
index 58e41738265..a2f3af751af 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java
@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.internal.GridComponent;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.Nullable;
@@ -59,11 +60,17 @@ public class DiscoveryDataBag {
/** @return ID fo the joining node. */
UUID joiningNodeId();
- /** @return Common for all cluster nodes discovery data that is sent
to the joining node. */
- Serializable commonData();
+ /**
+ * @param <T> Data type.
+ * @return Common for all cluster nodes discovery data that is sent to
the joining node.
+ */
+ <T> T commonData();
- /** @return Discovery data that is mapped to the particular cluster
node and sent to the joining node. */
- Map<UUID, Serializable> nodeSpecificData();
+ /**
+ * @param <T> Data type.
+ * @return Discovery data that is mapped to the particular cluster
node and sent to the joining node.
+ */
+ <T> Map<UUID, T> nodeSpecificData();
}
/**
@@ -87,7 +94,7 @@ public class DiscoveryDataBag {
@Override @Nullable public <T> T joiningNodeData() {
Message dataMsg = joiningNodeData.get(cmpId);
- return dataMsg instanceof ObjectData ? ObjectData.unwrap(dataMsg)
: (T)dataMsg;
+ return SerializableDataBagItemWrapper.unwrapIfNecessary(dataMsg);
}
/**
@@ -106,7 +113,7 @@ public class DiscoveryDataBag {
private int cmpId;
/** */
- private Map<UUID, Serializable> nodeSpecificData
+ private Map<UUID, Message> nodeSpecificData
= new
LinkedHashMap<>(DiscoveryDataBag.this.nodeSpecificData.size());
/** {@inheritDoc} */
@@ -115,16 +122,16 @@ public class DiscoveryDataBag {
}
/** {@inheritDoc} */
- @Override @Nullable public Serializable commonData() {
+ @Override @Nullable public <T> T commonData() {
if (commonData != null)
- return commonData.get(cmpId);
+ return
SerializableDataBagItemWrapper.unwrapIfNecessary(commonData.get(cmpId));
return null;
}
/** {@inheritDoc} */
- @Override public Map<UUID, Serializable> nodeSpecificData() {
- return nodeSpecificData;
+ @Override public <T> Map<UUID, T> nodeSpecificData() {
+ return F.viewReadOnly(nodeSpecificData,
SerializableDataBagItemWrapper::unwrapIfNecessary);
}
/**
@@ -142,7 +149,7 @@ public class DiscoveryDataBag {
private void reinitNodeSpecData(int cmpId) {
nodeSpecificData.clear();
- for (Map.Entry<UUID, Map<Integer, Serializable>> e :
DiscoveryDataBag.this.nodeSpecificData.entrySet()) {
+ for (Map.Entry<UUID, Map<Integer, Message>> e :
DiscoveryDataBag.this.nodeSpecificData.entrySet()) {
if (e.getValue() != null && e.getValue().containsKey(cmpId))
nodeSpecificData.put(e.getKey(), e.getValue().get(cmpId));
}
@@ -156,7 +163,7 @@ public class DiscoveryDataBag {
private static final UUID DEFAULT_KEY = null;
/** */
- private UUID joiningNodeId;
+ private final UUID joiningNodeId;
/**
* Component IDs with already initialized common discovery data.
@@ -164,13 +171,13 @@ public class DiscoveryDataBag {
private Set<Integer> cmnDataInitializedCmps;
/** */
- private Map<Integer, Message> joiningNodeData = new HashMap<>();
+ private final Map<Integer, Message> joiningNodeData = new HashMap<>();
/** */
- private Map<Integer, Serializable> commonData = new HashMap<>();
+ private final Map<Integer, Message> commonData = new HashMap<>();
/** */
- private Map<UUID, Map<Integer, Serializable>> nodeSpecificData = new
LinkedHashMap<>();
+ private final Map<UUID, Map<Integer, Message>> nodeSpecificData = new
LinkedHashMap<>();
/** */
private JoiningNodeDiscoveryDataImpl newJoinerData;
@@ -246,7 +253,7 @@ public class DiscoveryDataBag {
* @param data Serializable data.
*/
public void addJoiningNodeData(Integer cmpId, Serializable data) {
- joiningNodeData.put(cmpId, new ObjectData(data));
+ joiningNodeData.put(cmpId, new SerializableDataBagItemWrapper(data));
}
/**
@@ -259,19 +266,35 @@ public class DiscoveryDataBag {
/**
* @param cmpId Component ID.
- * @param data Data.
+ * @param data Serializable data.
*/
public void addGridCommonData(Integer cmpId, Serializable data) {
+ commonData.put(cmpId, new SerializableDataBagItemWrapper(data));
+ }
+
+ /**
+ * @param cmpId Component ID.
+ * @param data Message data.
+ */
+ public void addGridCommonData(Integer cmpId, Message data) {
commonData.put(cmpId, data);
}
/**
* @param cmpId Component ID.
- * @param data Data.
+ * @param data Serializable data.
*/
public void addNodeSpecificData(Integer cmpId, Serializable data) {
+ addNodeSpecificData(cmpId, new SerializableDataBagItemWrapper(data));
+ }
+
+ /**
+ * @param cmpId Component ID.
+ * @param data Message data.
+ */
+ public void addNodeSpecificData(Integer cmpId, Message data) {
if (!nodeSpecificData.containsKey(DEFAULT_KEY))
- nodeSpecificData.put(DEFAULT_KEY, new HashMap<Integer,
Serializable>());
+ nodeSpecificData.put(DEFAULT_KEY, new HashMap<>());
nodeSpecificData.get(DEFAULT_KEY).put(cmpId, data);
}
@@ -296,14 +319,14 @@ public class DiscoveryDataBag {
/**
* @param cmnData Cmn data.
*/
- public void commonData(Map<Integer, Serializable> cmnData) {
+ public void commonData(Map<Integer, Message> cmnData) {
commonData.putAll(cmnData);
}
/**
* @param nodeSpecData Node specific data.
*/
- public void nodeSpecificData(Map<UUID, Map<Integer, Serializable>>
nodeSpecData) {
+ public void nodeSpecificData(Map<UUID, Map<Integer, Message>>
nodeSpecData) {
nodeSpecificData.putAll(nodeSpecData);
}
@@ -316,12 +339,12 @@ public class DiscoveryDataBag {
* @return Discovery data for each Ignite component that is aggregated
from the cluster nodes and sent to the
* joining node.
*/
- public Map<Integer, Serializable> commonData() {
+ public Map<Integer, Message> commonData() {
return commonData;
}
/** @return Discovery data that belongs to the current cluster node and is
sent to the joining node. */
- @Nullable public Map<Integer, Serializable> localNodeSpecificData() {
+ @Nullable public Map<Integer, Message> localNodeSpecificData() {
return nodeSpecificData.get(DEFAULT_KEY);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/ObjectData.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/SerializableDataBagItemWrapper.java
similarity index 64%
rename from
modules/core/src/main/java/org/apache/ignite/spi/discovery/ObjectData.java
rename to
modules/core/src/main/java/org/apache/ignite/spi/discovery/SerializableDataBagItemWrapper.java
index f9da59bffe4..7e3bc16ed43 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/ObjectData.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/SerializableDataBagItemWrapper.java
@@ -29,8 +29,8 @@ import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.Nullable;
-/** Wrapper message for serializable data. */
-public class ObjectData implements MarshallableMessage {
+/** Wrapper message for serializable data in a {@link DiscoveryDataBag}. */
+public class SerializableDataBagItemWrapper implements MarshallableMessage {
/** */
@GridToStringInclude
private Serializable data;
@@ -40,16 +40,41 @@ public class ObjectData implements MarshallableMessage {
@Order(0)
byte[] dataBytes;
+ /** Unmarshalling error. */
+ IgniteCheckedException unmarshallError;
+
/** */
- public ObjectData() {}
+ public SerializableDataBagItemWrapper() {}
/**
* @param data Original data.
*/
- public ObjectData(Serializable data) {
+ public SerializableDataBagItemWrapper(Serializable data) {
this.data = data;
}
+ /**
+ * @param msg Message.
+ * @param <T> Type of data.
+ *
+ * @return Original message or data unwrapped from an
SerializableDataBagItemWrapper wrapper.
+ */
+ static @Nullable <T> T unwrapIfNecessary(@Nullable Message msg) {
+ if (msg == null)
+ return null;
+
+ return msg instanceof SerializableDataBagItemWrapper ?
((SerializableDataBagItemWrapper)msg).unwrap() : (T)msg;
+ }
+
+ /**
+ * @param <T> Type of data.
+ *
+ * @return Original data unwrapped from a message.
+ */
+ private <T> T unwrap() {
+ return (T)(data);
+ }
+
/** {@inheritDoc} */
@Override public void prepareMarshal(Marshaller marsh) throws
IgniteCheckedException {
if (data != null)
@@ -59,24 +84,24 @@ public class ObjectData implements MarshallableMessage {
/** {@inheritDoc} */
@Override public void finishUnmarshal(Marshaller marsh, ClassLoader
clsLdr) throws IgniteCheckedException {
if (dataBytes != null) {
- data = U.unmarshal(marsh, dataBytes, clsLdr);
+ try {
+ data = U.unmarshal(marsh, dataBytes, clsLdr);
- dataBytes = null;
+ dataBytes = null;
+ }
+ catch (IgniteCheckedException e) {
+ unmarshallError = e;
+ }
}
}
- /**
- * @param msg Message.
- * @param <T> Type of data.
- *
- * @return Original data unwrapped from a message.
- */
- public static <T> T unwrap(@Nullable Message msg) {
- return msg != null ? (T)(((ObjectData)msg).data) : null;
+ /** @return Unmarshalling error. */
+ public IgniteCheckedException unmarshallError() {
+ return unmarshallError;
}
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(ObjectData.class, this);
+ return S.toString(SerializableDataBagItemWrapper.class, this);
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index a8a4f2f0a47..75c24745be3 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -2583,36 +2583,6 @@ class ServerImpl extends TcpDiscoveryImpl {
if (addedMsg.gridDiscoveryData() != null)
addedMsg.clearDiscoveryData();
}
- else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) {
- TcpDiscoveryNodeAddFinishedMessage addFinishMsg =
(TcpDiscoveryNodeAddFinishedMessage)msg;
-
- if (addFinishMsg.clientDiscoData() != null) {
- addFinishMsg = new
TcpDiscoveryNodeAddFinishedMessage(addFinishMsg);
-
- msg = addFinishMsg;
-
- DiscoveryDataPacket discoData =
addFinishMsg.clientDiscoData();
-
- Set<Integer> mrgdCmnData = new HashSet<>();
- Set<UUID> mrgdSpecData = new HashSet<>();
-
- boolean allMerged = false;
-
- for (TcpDiscoveryAbstractMessage msg0 : msgs) {
-
- if (msg0 instanceof
TcpDiscoveryNodeAddFinishedMessage) {
- DiscoveryDataPacket existingDiscoData =
-
((TcpDiscoveryNodeAddFinishedMessage)msg0).clientDiscoData();
-
- if (existingDiscoData != null)
- allMerged =
discoData.mergeDataFrom(existingDiscoData, mrgdCmnData, mrgdSpecData);
- }
-
- if (allMerged)
- break;
- }
- }
- }
else if (msg instanceof TcpDiscoveryNodeLeftMessage)
clearClientAddFinished(msg.creatorNodeId());
else if (msg instanceof TcpDiscoveryNodeFailedMessage)
@@ -4779,9 +4749,17 @@ class ServerImpl extends TcpDiscoveryImpl {
/** */
private IgniteNodeValidationResult
validateByIgniteComponentsWithJoiningNodeData(TcpDiscoveryJoinRequestMessage
req) {
- DiscoveryDataBag data =
req.gridDiscoveryData().bagWithJoiningNodeData();
+ DiscoveryDataPacket packet = req.gridDiscoveryData();
- return spi.getSpiContext().validateNode(req.node(), data);
+ try {
+ DiscoveryDataBag dataBag =
packet.bagWithJoiningNodeData(spi.ignite().log(),
+ spi.ignite().configuration().isClientMode());
+
+ return spi.getSpiContext().validateNode(req.node(), dataBag);
+ }
+ catch (IgniteCheckedException e) {
+ return new IgniteNodeValidationResult(req.node().id(),
e.getMessage());
+ }
}
/** */
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index a7f68fb1cf5..eb868103ad1 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -2068,21 +2068,14 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter
implements IgniteDiscovery
assert dataPacket != null;
assert dataPacket.joiningNodeId() != null;
- //create data bag, pass it to exchange.collect
DiscoveryDataBag dataBag = dataPacket.bagForDataCollection();
exchange.collect(dataBag);
- //marshall collected bag into packet, return packet
if (dataPacket.joiningNodeId().equals(locNode.id()))
dataPacket.addJoiningNodeData(dataBag);
else
- dataPacket.marshalGridNodeData(
- dataBag,
- locNode.id(),
- marshaller(),
- ignite.configuration().getNetworkCompressionLevel(),
- log);
+ dataPacket.addNodeData(dataBag, locNode.id());
return dataPacket;
}
@@ -2097,22 +2090,21 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter
implements IgniteDiscovery
DiscoveryDataBag dataBag;
- if (dataPacket.joiningNodeId().equals(locNode.id())) {
- try {
- dataBag = dataPacket.unmarshalGridData(marshaller(), clsLdr,
locNode.clientRouterNodeId() != null, log);
- }
- catch (IgniteCheckedException e) {
- if (ignite() instanceof IgniteEx) {
- FailureProcessor failure =
((IgniteEx)ignite()).context().failure();
-
- failure.process(new FailureContext(CRITICAL_ERROR, e));
- }
+ try {
+ if (dataPacket.joiningNodeId().equals(locNode.id()))
+ dataBag = dataPacket.bagWithNodeData(ignite.log(),
ignite.configuration().isClientMode());
+ else
+ dataBag = dataPacket.bagWithJoiningNodeData(ignite.log(),
ignite.configuration().isClientMode());
+ }
+ catch (IgniteCheckedException e) {
+ if (ignite() instanceof IgniteEx) {
+ FailureProcessor failure =
((IgniteEx)ignite()).context().failure();
- throw new IgniteException(e);
+ failure.process(new FailureContext(CRITICAL_ERROR, e));
}
+
+ throw new IgniteException(e);
}
- else
- dataBag = dataPacket.bagWithJoiningNodeData();
exchange.onExchange(dataBag);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/DiscoveryDataPacket.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/DiscoveryDataPacket.java
index 4ded165df70..cd1e8c10da3 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/DiscoveryDataPacket.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/DiscoveryDataPacket.java
@@ -16,9 +16,6 @@
*/
package org.apache.ignite.spi.discovery.tcp.internal;
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -31,23 +28,19 @@ import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+import org.apache.ignite.spi.discovery.SerializableDataBagItemWrapper;
+import org.jetbrains.annotations.Nullable;
+import static java.lang.Boolean.TRUE;
import static
org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CONTINUOUS_PROC;
/**
- * Carries discovery data in marshalled form
+ * Carries discovery data in form of {@link Message}
* and allows convenient way of converting it to and from {@link
DiscoveryDataBag} objects.
*/
-public class DiscoveryDataPacket implements Serializable, Message {
- /** Local file header signature (read as a little-endian number). */
- private static final int ZIP_HEADER_SIGNATURE = 0x04034b50;
-
- /** */
- private static final long serialVersionUID = 0L;
-
+public class DiscoveryDataPacket implements Message {
/** */
@Order(0)
UUID joiningNodeId;
@@ -59,14 +52,19 @@ public class DiscoveryDataPacket implements Serializable,
Message {
/** */
@Order(2)
- Map<Integer, byte[]> commonData = new HashMap<>();
+ @Compress
+ Map<Integer, Message> commonData = new HashMap<>();
/** */
@Order(3)
- Map<UUID, Map<Integer, byte[]>> nodeSpecificData = new HashMap<>();
+ @Compress
+ Map<UUID, Map<Integer, Message>> nodeSpecificData = new HashMap<>();
/** */
- private transient boolean joiningNodeClient;
+ private boolean joiningNodeClient;
+
+ /** Unmarshalling error, if any. */
+ private IgniteCheckedException unmarshErr;
/** Constructor. */
public DiscoveryDataPacket() {
@@ -90,25 +88,15 @@ public class DiscoveryDataPacket implements Serializable,
Message {
/**
* @param bag Bag.
* @param nodeId Node id.
- * @param marsh Marsh.
- * @param log Logger.
*/
- public void marshalGridNodeData(DiscoveryDataBag bag, UUID nodeId,
Marshaller marsh,
- int compressionLevel, IgniteLogger log) {
- marshalData(bag.commonData(), commonData, marsh, compressionLevel,
log);
-
- Map<Integer, Serializable> locNodeSpecificData =
bag.localNodeSpecificData();
-
- if (locNodeSpecificData != null) {
- Map<Integer, byte[]> marshLocNodeSpecificData =
U.newHashMap(locNodeSpecificData.size());
-
- marshalData(locNodeSpecificData, marshLocNodeSpecificData, marsh,
compressionLevel, log);
+ public void addNodeData(DiscoveryDataBag bag, UUID nodeId) {
+ if (bag.commonData() != null)
+ commonData.putAll(bag.commonData());
- filterDuplicatedData(marshLocNodeSpecificData);
+ Map<Integer, Message> locNodeSpecificData =
bag.localNodeSpecificData();
- if (!marshLocNodeSpecificData.isEmpty())
- nodeSpecificData.put(nodeId, marshLocNodeSpecificData);
- }
+ if (!F.isEmpty(locNodeSpecificData))
+ nodeSpecificData.put(nodeId, locNodeSpecificData);
}
/**
@@ -120,47 +108,34 @@ public class DiscoveryDataPacket implements Serializable,
Message {
}
/**
- * @param marsh Marsh.
- * @param clsLdr Class loader.
- * @param clientNode Client node.
- * @param log Logger.
+ * @param log Ignite logger.
+ * @param client Client mode flag.
+ *
+ * @return Data bag with node data.
*/
- public DiscoveryDataBag unmarshalGridData(
- Marshaller marsh,
- ClassLoader clsLdr,
- boolean clientNode,
- IgniteLogger log
- ) throws IgniteCheckedException {
- DiscoveryDataBag dataBag = new DiscoveryDataBag(joiningNodeId,
joiningNodeClient);
-
- if (commonData != null && !commonData.isEmpty())
- dataBag.commonData(unmarshalData(commonData, marsh, clsLdr,
clientNode, log, true));
-
- if (nodeSpecificData != null && !nodeSpecificData.isEmpty()) {
- Map<UUID, Map<Integer, Serializable>> unmarshNodeSpecData =
U.newLinkedHashMap(nodeSpecificData.size());
+ public DiscoveryDataBag bagWithNodeData(IgniteLogger log, Boolean client)
throws IgniteCheckedException {
+ checkUnmarshallingErrors(log, client);
- for (Map.Entry<UUID, Map<Integer, byte[]>> nodeBinEntry :
nodeSpecificData.entrySet()) {
- Map<Integer, byte[]> nodeBinData = nodeBinEntry.getValue();
-
- if (nodeBinData == null || nodeBinData.isEmpty())
- continue;
+ DiscoveryDataBag dataBag = new DiscoveryDataBag(joiningNodeId,
joiningNodeClient);
- unmarshNodeSpecData.put(
- nodeBinEntry.getKey(),
- unmarshalData(nodeBinData, marsh, clsLdr, clientNode, log,
true)
- );
- }
+ if (!F.isEmpty(commonData))
+ dataBag.commonData(commonData);
- dataBag.nodeSpecificData(unmarshNodeSpecData);
- }
+ if (!F.isEmpty(nodeSpecificData))
+ dataBag.nodeSpecificData(nodeSpecificData);
return dataBag;
}
/**
+ * @param log Ignite logger.
+ * @param client Client mode flag.
+ *
* @return Data bag with joining node data.
*/
- public DiscoveryDataBag bagWithJoiningNodeData() {
+ public DiscoveryDataBag bagWithJoiningNodeData(IgniteLogger log, @Nullable
Boolean client) throws IgniteCheckedException {
+ checkUnmarshallingErrors(log, client);
+
DiscoveryDataBag dataBag = new DiscoveryDataBag(joiningNodeId,
joiningNodeClient);
if (!F.isEmpty(joiningNodeData))
@@ -184,199 +159,68 @@ public class DiscoveryDataPacket implements
Serializable, Message {
}
/**
- * @param existingDataPacket Existing data packet.
- * @param mrgdCmnDataKeys Mrgd cmn data keys.
- * @param mrgdSpecifDataKeys Mrgd specif data keys.
+ * Dumps and throws caught unmarshalling errors.
+ *
+ * @param log Ignite logger.
+ * @param client Client mode flag.
+ * @throws IgniteCheckedException If unmarshalling errors occurs.
*/
- public boolean mergeDataFrom(
- DiscoveryDataPacket existingDataPacket,
- Collection<Integer> mrgdCmnDataKeys,
- Collection<UUID> mrgdSpecifDataKeys
- ) {
- if (commonData.size() != mrgdCmnDataKeys.size()) {
- for (Map.Entry<Integer, byte[]> e : commonData.entrySet()) {
- if (!mrgdCmnDataKeys.contains(e.getKey())) {
- byte[] data =
existingDataPacket.commonData.get(e.getKey());
-
- if (data != null && Arrays.equals(e.getValue(), data)) {
- e.setValue(data);
-
- boolean add = mrgdCmnDataKeys.add(e.getKey());
+ public void checkUnmarshallingErrors(IgniteLogger log, @Nullable Boolean
client)
+ throws IgniteCheckedException {
+ if (unmarshErr != null)
+ throw unmarshErr;
- assert add;
+ Iterator<Map.Entry<Integer, Message>> dataIter =
compoundDataIterator();
- if (mrgdCmnDataKeys.size() == commonData.size())
- break;
- }
- }
- }
- }
+ IgniteCheckedException err = null;
- if (nodeSpecificData.size() != mrgdSpecifDataKeys.size()) {
- for (Map.Entry<UUID, Map<Integer, byte[]>> e :
nodeSpecificData.entrySet()) {
- if (!mrgdSpecifDataKeys.contains(e.getKey())) {
- Map<Integer, byte[]> data =
existingDataPacket.nodeSpecificData.get(e.getKey());
+ while (dataIter.hasNext()) {
+ Map.Entry<Integer, Message> item = dataIter.next();
- if (data != null && mapsEqual(e.getValue(), data)) {
- e.setValue(data);
+ if (item.getValue() instanceof SerializableDataBagItemWrapper
wrapper) {
+ int cmpId = item.getKey();
- boolean add = mrgdSpecifDataKeys.add(e.getKey());
+ IgniteCheckedException e = wrapper.unmarshallError();
- assert add;
+ if (e != null) {
+ if (CONTINUOUS_PROC.ordinal() == cmpId && X.hasCause(e,
ClassNotFoundException.class) && TRUE.equals(client)) {
+ U.warn(log, "Failed to unmarshal continuous query
remote filter on client node. " +
+ "Can be ignored.");
- if (mrgdSpecifDataKeys.size() ==
nodeSpecificData.size())
- break;
+ continue;
+ }
+ else if (cmpId <
GridComponent.DiscoveryDataExchangeType.VALUES.length) {
+ U.error(log, "Failed to unmarshal discovery data for
component: " +
+
GridComponent.DiscoveryDataExchangeType.VALUES[cmpId], e);
+ }
+ else {
+ U.warn(log, "Failed to unmarshal discovery data." +
+ " Component " + cmpId + " is not found.", e);
}
- }
- }
- }
-
- return (mrgdCmnDataKeys.size() == commonData.size()) &&
(mrgdSpecifDataKeys.size() == nodeSpecificData.size());
- }
-
- /**
- * @param m1 first map to compare.
- * @param m2 second map to compare.
- */
- private boolean mapsEqual(Map<Integer, byte[]> m1, Map<Integer, byte[]>
m2) {
- if (m1 == m2)
- return true;
-
- if (m1.size() == m2.size()) {
- for (Map.Entry<Integer, byte[]> e : m1.entrySet()) {
- byte[] data = m2.get(e.getKey());
-
- if (!Arrays.equals(e.getValue(), data))
- return false;
- }
-
- return true;
- }
-
- return false;
- }
-
- /**
- * @param src Source.
- * @param marsh Marsh.
- * @param clsLdr Class loader.
- * @param clientNode Client node.
- * @param log Logger.
- * @param panic Throw unmarshalling if {@code true}.
- * @throws IgniteCheckedException If {@code panic} is {@code True} and
unmarshalling failed.
- */
- private Map<Integer, Serializable> unmarshalData(
- Map<Integer, byte[]> src,
- Marshaller marsh,
- ClassLoader clsLdr,
- boolean clientNode,
- IgniteLogger log,
- boolean panic
- ) throws IgniteCheckedException {
- Map<Integer, Serializable> res = U.newHashMap(src.size());
-
- for (Map.Entry<Integer, byte[]> binEntry : src.entrySet()) {
- try {
- Serializable compData = isZipped(binEntry.getValue()) ?
- U.unmarshalZip(marsh, binEntry.getValue(), clsLdr) :
- U.unmarshal(marsh, binEntry.getValue(), clsLdr);
- res.put(binEntry.getKey(), compData);
- }
- catch (IgniteCheckedException e) {
- if (CONTINUOUS_PROC.ordinal() == binEntry.getKey() &&
- X.hasCause(e, ClassNotFoundException.class) && clientNode
- ) {
- U.warn(log, "Failed to unmarshal continuous query remote
filter on client node. Can be ignored.");
- continue;
- }
- else if (binEntry.getKey() <
GridComponent.DiscoveryDataExchangeType.VALUES.length) {
- U.error(log,
- "Failed to unmarshal discovery data for component: " +
-
GridComponent.DiscoveryDataExchangeType.VALUES[binEntry.getKey()],
- e
- );
- }
- else {
- U.warn(log, "Failed to unmarshal discovery data." +
- " Component " + binEntry.getKey() + " is not found.");
+ if (err == null)
+ err = e;
+ else
+ err.addSuppressed(e);
}
-
- if (panic)
- throw e;
}
}
- return res;
- }
-
- /**
- * @param val Value to check.
- * @return {@code true} if value is zipped.
- */
- private boolean isZipped(byte[] val) {
- return val != null && val.length > 3 && makeInt(val) ==
ZIP_HEADER_SIGNATURE;
- }
-
- /**
- * Make int from first 4 bytes in little-endian byte order.
- *
- * @param b Source of bytes.
- * @return Made int.
- */
- private static int makeInt(byte[] b) {
- return (((b[3]) << 24) |
- ((b[2] & 0xff) << 16) |
- ((b[1] & 0xff) << 8) |
- ((b[0] & 0xff)));
- }
+ if (err != null) {
+ unmarshErr = err;
- /**
- * @param src Source.
- * @param target Target.
- * @param marsh Marsh.
- * @param log Logger.
- */
- private void marshalData(
- Map<Integer, Serializable> src,
- Map<Integer, byte[]> target,
- Marshaller marsh,
- int compressionLevel,
- IgniteLogger log
- ) {
- // may happen if nothing was collected from components,
- // corresponding map (for common data or for node specific data) left
null
- if (src == null)
- return;
-
- for (Map.Entry<Integer, Serializable> entry : src.entrySet()) {
- try {
- target.put(entry.getKey(), U.zip(U.marshal(marsh,
entry.getValue()), compressionLevel));
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to marshal discovery data " +
- "[comp=" + entry.getKey() + ", data=" +
entry.getValue() + ']', e);
- }
+ throw err;
}
}
- /** */
- private void filterDuplicatedData(Map<Integer, byte[]> discoData) {
- for (Map<Integer, byte[]> existingData : nodeSpecificData.values()) {
- Iterator<Map.Entry<Integer, byte[]>> it =
discoData.entrySet().iterator();
-
- while (it.hasNext()) {
- Map.Entry<Integer, byte[]> discoDataEntry = it.next();
-
- byte[] curData = existingData.get(discoDataEntry.getKey());
-
- if (Arrays.equals(curData, discoDataEntry.getValue()))
- it.remove();
- }
-
- if (discoData.isEmpty())
- break;
- }
+ /** @return Iterator through all messages, stored in DataPacket. */
+ private Iterator<Map.Entry<Integer, Message>> compoundDataIterator() {
+ return F.concat(joiningNodeData.entrySet().iterator(),
+ commonData.entrySet().iterator(),
+ nodeSpecificData.values()
+ .stream()
+ .flatMap(m -> m.entrySet().stream())
+ .iterator());
}
/**
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchangeTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchangeTest.java
index 30a7ec80298..49a9ccc63c1 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchangeTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchangeTest.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.lang.IgniteProductVersion;
+import
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
import org.apache.ignite.spi.IgniteSpiAdapter;
import org.apache.ignite.spi.IgniteSpiContext;
import org.apache.ignite.spi.IgniteSpiException;
@@ -226,6 +227,11 @@ public class DiscoverySpiDataExchangeTest extends
GridCommonAbstractTest {
delegate.resolveCommunicationFailure(node, err);
}
+ /** {@inheritDoc} */
+ @Override public MessageFactoryProvider messageFactoryProvider() {
+ return delegate.messageFactoryProvider();
+ }
+
/** Delegated discovery data exchange. */
private class DelegatedDiscoverySpiDataExchange implements
DiscoverySpiDataExchange {
/** Discovery data exchange delegate. */
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index ad98a2936b9..4920d029538 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -58,6 +58,8 @@ import
org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import
org.apache.ignite.internal.processors.continuous.StartRoutineAckDiscoveryMessage;
+import org.apache.ignite.internal.processors.marshaller.MappedName;
+import org.apache.ignite.internal.processors.marshaller.MarshallerDataBagItem;
import org.apache.ignite.internal.processors.port.GridPortRecord;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
@@ -2431,7 +2433,7 @@ public class TcpDiscoverySelfTest extends
GridCommonAbstractTest {
DiscoveryDataBag bag = exchange.collect(dataBag);
if
(bag.commonData().containsKey(MARSHALLER_PROC.ordinal()))
- marshalledItems =
getJavaMappings(getAllMappings(dataBag)).size();
+ marshalledItems =
getJavaMappings(marshallerDataBagItem(dataBag)).size();
return bag;
}
@@ -2440,12 +2442,12 @@ public class TcpDiscoverySelfTest extends
GridCommonAbstractTest {
exchange.onExchange(dataBag);
}
- private List getAllMappings(DiscoveryDataBag bag) {
- return
(List)bag.commonData().get(MARSHALLER_PROC.ordinal());
+ private MarshallerDataBagItem
marshallerDataBagItem(DiscoveryDataBag bag) {
+ return
(MarshallerDataBagItem)bag.commonData().get(MARSHALLER_PROC.ordinal());
}
- private Map getJavaMappings(List allMappings) {
- return (Map)allMappings.get(JAVA_ID);
+ private Map<Integer, MappedName>
getJavaMappings(MarshallerDataBagItem marshallerDataBagItem) {
+ return marshallerDataBagItem.mappings().get(JAVA_ID);
}
});
}
diff --git
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java
index a186aed5265..7bdc5ff8ea4 100644
---
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java
+++
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkBulkJoinContext.java
@@ -17,28 +17,28 @@
package org.apache.ignite.spi.discovery.zk.internal;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.plugin.extensions.communication.Message;
/**
*
*/
class ZkBulkJoinContext {
/** */
- List<T2<ZkJoinedNodeEvtData, Map<Integer, Serializable>>> nodes;
+ List<T2<ZkJoinedNodeEvtData, ZkDiscoDataBagWrapper>> nodes;
/**
* @param nodeEvtData Node event data.
* @param discoData Discovery data for node.
*/
- void addJoinedNode(ZkJoinedNodeEvtData nodeEvtData, Map<Integer,
Serializable> discoData) {
+ void addJoinedNode(ZkJoinedNodeEvtData nodeEvtData, Map<Integer, Message>
discoData) {
if (nodes == null)
nodes = new ArrayList<>();
- nodes.add(new T2<>(nodeEvtData, discoData));
+ nodes.add(new T2<>(nodeEvtData, new ZkDiscoDataBagWrapper(discoData)));
}
/**
diff --git
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoDataBagWrapper.java
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoDataBagWrapper.java
new file mode 100644
index 00000000000..28a188ec0ca
--- /dev/null
+++
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoDataBagWrapper.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.zk.internal;
+
+import java.util.Map;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import org.apache.ignite.spi.discovery.SerializableDataBagItemWrapper;
+
+/** Data bag data holder. */
+public class ZkDiscoDataBagWrapper implements Message {
+ /** */
+ @Order(0)
+ Map<Integer, Message> data;
+
+ /** */
+ private IgniteCheckedException unmarshErr;
+
+ /** Default constructor for {@link MessageFactory}. */
+ public ZkDiscoDataBagWrapper() {
+ // No-op.
+ }
+
+ /** @param data Discovery data. */
+ public ZkDiscoDataBagWrapper(Map<Integer, Message> data) {
+ this.data = data;
+ }
+
+ /**
+ * Returns data or throws caught unmarshalling errors.
+ *
+ * @return Data.
+ * @throws IgniteCheckedException Unmarshalling exception, if any.
+ */
+ public Map<Integer, Message> unmarshalledData() throws
IgniteCheckedException {
+ if (unmarshErr != null)
+ throw unmarshErr;
+
+ IgniteCheckedException err = null;
+
+ for (Message msg : data.values()) {
+ if (msg instanceof SerializableDataBagItemWrapper wrapper &&
wrapper.unmarshallError() != null) {
+ IgniteCheckedException e = wrapper.unmarshallError();
+
+ if (err == null)
+ err = e;
+ else
+ err.addSuppressed(e);
+ }
+ }
+
+ if (err != null) {
+ unmarshErr = err;
+
+ throw err;
+ }
+
+ return data;
+ }
+}
diff --git
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkMessageFactory.java
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkMessageFactory.java
index 39d89b32af8..cf9f95a908f 100644
---
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkMessageFactory.java
+++
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkMessageFactory.java
@@ -28,5 +28,6 @@ public class ZkMessageFactory implements
MessageFactoryProvider {
factory.register(401, ZkCommunicationErrorResolveStartMessage::new,
new ZkCommunicationErrorResolveStartMessageSerializer());
factory.register(402, ZkForceNodeFailMessage::new, new
ZkForceNodeFailMessageSerializer());
factory.register(403, ZkNoServersMessage::new, new
ZkNoServersMessageSerializer());
+ factory.register(404, ZkDiscoDataBagWrapper::new, new
ZkDiscoDataBagWrapperSerializer());
}
}
diff --git
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 0d0531d1a9a..f69e9f6d106 100644
---
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -78,6 +78,7 @@ import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.plugin.security.SecurityCredentials;
import org.apache.ignite.spi.IgniteNodeValidationResult;
@@ -1781,7 +1782,7 @@ public class ZookeeperDiscoveryImpl {
long evtId = rtState.evtsData.evtIdGen;
- List<T2<ZkJoinedNodeEvtData, Map<Integer, Serializable>>> nodes =
joinCtx.nodes;
+ List<T2<ZkJoinedNodeEvtData, ZkDiscoDataBagWrapper>> nodes =
joinCtx.nodes;
assert nodes != null && !nodes.isEmpty();
@@ -1793,11 +1794,9 @@ public class ZookeeperDiscoveryImpl {
Map<Long, Long> dupDiscoData = null;
for (int i = 0; i < nodeCnt; i++) {
- T2<ZkJoinedNodeEvtData, Map<Integer, Serializable>> nodeEvtData =
nodes.get(i);
+ T2<ZkJoinedNodeEvtData, ZkDiscoDataBagWrapper> nodeEvtData =
nodes.get(i);
- Map<Integer, Serializable> discoData = nodeEvtData.get2();
-
- byte[] discoDataBytes = U.marshal(marsh, discoData);
+ byte[] discoDataBytes = msgParser.marshalZip(nodeEvtData.get2());
Long dupDataNode = null;
@@ -2251,7 +2250,7 @@ public class ZookeeperDiscoveryImpl {
exchange.collect(collectBag);
- Map<Integer, Serializable> commonData = collectBag.commonData();
+ Map<Integer, Message> commonData = collectBag.commonData();
Object old = curTop.put(joinedNode.order(), joinedNode);
@@ -3021,12 +3020,11 @@ public class ZookeeperDiscoveryImpl {
byte[] discoDataBytes =
dataForJoined.discoveryDataForNode(locNode.order());
- Map<Integer, Serializable> commonDiscoData =
- marsh.unmarshal(discoDataBytes,
U.resolveClassLoader(spi.ignite().configuration()));
+ ZkDiscoDataBagWrapper zkDataBagWrapper =
msgParser.unmarshalZip(discoDataBytes);
DiscoveryDataBag dataBag = new DiscoveryDataBag(locNode.id(),
locNode.isClient());
- dataBag.commonData(commonDiscoData);
+ dataBag.commonData(zkDataBagWrapper.unmarshalledData());
exchange.onExchange(dataBag);