sergey-chugunov-1985 commented on code in PR #12620:
URL: https://github.com/apache/ignite/pull/12620#discussion_r2698700253
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java:
##########
@@ -3399,10 +3390,11 @@ private void
sendMessageToClients(TcpDiscoveryAbstractMessage msg) {
for (ClientMessageWorker clientMsgWorker :
clientMsgWorkers.values()) {
if (msgBytes == null) {
try {
- msgBytes = U.marshal(spi.marshaller(), msg);
+ msgBytes =
clientMsgWorker.ses.serializeMessage(msg);
}
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to marshal message: " + msg,
e);
+ catch (IgniteCheckedException | IOException e) {
+ U.error(log, "Failed to serialize message to a
client: " + msg + ", client id: "
Review Comment:
```suggestion
U.error(log, "Failed to serialize message: " +
msg + ", recepient client id: "
```
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java:
##########
@@ -320,4 +306,147 @@ private void detectSslAlert(byte firstByte, InputStream
in) throws IOException {
if (hex.matches("15....00"))
throw new StreamCorruptedException("invalid stream header: " +
hex);
}
+
+ /**
+ * Input stream allowing to read some data first as a prefix of an
original input stream.
Review Comment:
```suggestion
* Input stream implementation that combines a byte array and a regular
InputStream allowing to read
* bytes from the array first and then proceed with reading from
InputStream.
```
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java:
##########
@@ -320,4 +306,147 @@ private void detectSslAlert(byte firstByte, InputStream
in) throws IOException {
if (hex.matches("15....00"))
throw new StreamCorruptedException("invalid stream header: " +
hex);
}
+
+ /**
+ * Input stream allowing to read some data first as a prefix of an
original input stream.
+ * Supports only basic read methods.
+ */
+ private static class PrefixedBufferedInputStream extends
BufferedInputStream {
+ /** Prefix data input stream to read before the original input stream.
*/
+ @Nullable private ByteArrayInputStream prefixIs;
+
+ /** @param srcIs Original input stream to read when {@link #prefixIs}
is empty. */
+ private PrefixedBufferedInputStream(InputStream srcIs) {
+ super(srcIs);
+ }
+
+ /** @param prefixData Prefix data to read before the original input
stream. */
+ private void acceptPrefixBuffer(byte[] prefixData) {
Review Comment:
```suggestion
private void attachByteArray(byte[] attachment) {
```
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java:
##########
@@ -6073,9 +6066,11 @@ private void
processMetricsUpdateMessage(TcpDiscoveryMetricsUpdateMessage msg) {
}
else {
// Message is on its second ring.
- removeMetrics(msg, locNodeId);
+ msg.removeServerMetrics(locNodeId);
- Collection<UUID> clientNodeIds = msg.clientNodeIds();
+ Collection<UUID> clientNodeIds = msg.clientNodeIds() ==
null
Review Comment:
Is `F.emptyIfNull` applicable here?
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java:
##########
@@ -320,4 +306,147 @@ private void detectSslAlert(byte firstByte, InputStream
in) throws IOException {
if (hex.matches("15....00"))
throw new StreamCorruptedException("invalid stream header: " +
hex);
}
+
+ /**
+ * Input stream allowing to read some data first as a prefix of an
original input stream.
+ * Supports only basic read methods.
+ */
+ private static class PrefixedBufferedInputStream extends
BufferedInputStream {
+ /** Prefix data input stream to read before the original input stream.
*/
+ @Nullable private ByteArrayInputStream prefixIs;
Review Comment:
```suggestion
@Nullable private ByteArrayInputStream attachedBytesIs;
```
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodesMetricsMapMessage.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp.messages;
+
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+
+/** Holds map of nodes metrics messages per node id. */
+public class TcpDiscoveryNodesMetricsMapMessage implements Message {
Review Comment:
But at the same time we use it only for collecting and distributing client
metrics, and there are no plans to develop discovery-based metrics further.
So I still think giving this class a more specific wouldn't harm existing
codebase but would make it more readable instead.
There are five other classes matching "TcpDiscovery*MetricsMessage", and it
is already hard to build a mental model of what's going on. Giving a sixth one
a vague name makes this situation even worse.
##########
modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java:
##########
@@ -320,4 +306,147 @@ private void detectSslAlert(byte firstByte, InputStream
in) throws IOException {
if (hex.matches("15....00"))
throw new StreamCorruptedException("invalid stream header: " +
hex);
}
+
+ /**
+ * Input stream allowing to read some data first as a prefix of an
original input stream.
+ * Supports only basic read methods.
+ */
+ private static class PrefixedBufferedInputStream extends
BufferedInputStream {
Review Comment:
```suggestion
private static class CompositeInputStream extends BufferedInputStream {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]