This is an automated email from the ASF dual-hosted git repository.

timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d800f111f8 IGNITE-25912 Use MessageSerializer for 
SchemaOperationStatusMessage (#12300)
1d800f111f8 is described below

commit 1d800f111f87dd0cce14239da1a1ad665f88c463
Author: Dmitry Werner <[email protected]>
AuthorDate: Mon Sep 1 16:28:12 2025 +0500

    IGNITE-25912 Use MessageSerializer for SchemaOperationStatusMessage (#12300)
---
 .../communication/GridIoMessageFactory.java        |   3 +-
 .../processors/query/GridQueryProcessor.java       |  67 ++--------
 .../query/schema/SchemaOperationException.java     |  12 ++
 .../message/SchemaOperationStatusMessage.java      | 135 ++++++++-------------
 4 files changed, 80 insertions(+), 137 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 9746ed087bb..f7a7e818fc5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -51,6 +51,7 @@ import 
org.apache.ignite.internal.codegen.MetadataRequestMessageSerializer;
 import 
org.apache.ignite.internal.codegen.MissingMappingRequestMessageSerializer;
 import 
org.apache.ignite.internal.codegen.MissingMappingResponseMessageSerializer;
 import org.apache.ignite.internal.codegen.NearCacheUpdatesSerializer;
+import 
org.apache.ignite.internal.codegen.SchemaOperationStatusMessageSerializer;
 import org.apache.ignite.internal.codegen.SessionChannelMessageSerializer;
 import 
org.apache.ignite.internal.codegen.TcpInverseConnectionResponseMessageSerializer;
 import org.apache.ignite.internal.codegen.TxLockSerializer;
@@ -212,7 +213,7 @@ public class GridIoMessageFactory implements 
MessageFactoryProvider {
         // -54 is reserved for SQL.
         // -46 ... -51 - snapshot messages.
         factory.register((short)-61, IgniteDiagnosticMessage::new);
-        factory.register((short)-53, SchemaOperationStatusMessage::new);
+        factory.register((short)-53, SchemaOperationStatusMessage::new, new 
SchemaOperationStatusMessageSerializer());
         factory.register((short)-52, GridIntList::new, new 
GridIntListSerializer());
         factory.register((short)-51, NearCacheUpdates::new, new 
NearCacheUpdatesSerializer());
         factory.register((short)-50, GridNearAtomicCheckUpdateRequest::new);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 77ff80730f6..b0cc3cc5964 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -152,7 +152,6 @@ import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.marshaller.jdk.JdkMarshaller;
 import org.apache.ignite.session.SessionContext;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
@@ -206,9 +205,6 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
     /** For tests. */
     public static Class<? extends GridQueryIndexing> idxCls;
 
-    /** JDK marshaller to serialize errors. */
-    private final JdkMarshaller marsh;
-
     /** */
     private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
 
@@ -360,7 +356,6 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
 
         idxBuildStatusStorage = new IndexBuildStatusStorage(ctx);
         txAwareQueriesEnabled = U.isTxAwareQueriesEnabled(ctx);
-        marsh = ctx.marshallerContext().jdkMarshaller();
     }
 
     /** {@inheritDoc} */
@@ -3903,15 +3898,18 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
      * @param err Error.
      * @param nop No-op flag.
      */
-    public void sendStatusMessage(UUID destNodeId, UUID opId, 
SchemaOperationException err, boolean nop) {
+    public void sendStatusMessage(UUID destNodeId, UUID opId, @Nullable 
SchemaOperationException err, boolean nop) {
         if (log.isDebugEnabled())
             log.debug("Sending schema operation status message [opId=" + opId 
+ ", crdNode=" + destNodeId +
                 ", err=" + err + ", nop=" + nop + ']');
 
         try {
-            byte[] errBytes = marshalSchemaError(opId, err);
-
-            SchemaOperationStatusMessage msg = new 
SchemaOperationStatusMessage(opId, errBytes, nop);
+            SchemaOperationStatusMessage msg = new 
SchemaOperationStatusMessage(
+                opId,
+                err != null ? err.code() : -1,
+                err != null ? err.getMessage() : null,
+                nop
+            );
 
             // Messages must go to dedicated schema pool. We cannot push them 
to query pool because in this case
             // they could be blocked with other query requests.
@@ -3952,7 +3950,7 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
                         log.debug("Received status message [opId=" + 
msg.operationId() +
                             ", sndNodeId=" + msg.senderNodeId() + ']');
 
-                    op.manager().onNodeFinished(msg.senderNodeId(), 
unmarshalSchemaError(msg.errorBytes()), msg.nop());
+                    op.manager().onNodeFinished(msg.senderNodeId(), 
schemaError(msg), msg.nop());
 
                     return;
                 }
@@ -3982,7 +3980,7 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
             SchemaOperationStatusMessage msg = it.next();
 
             if (Objects.equals(msg.operationId(), opId)) {
-                mgr.onNodeFinished(msg.senderNodeId(), 
unmarshalSchemaError(msg.errorBytes()), msg.nop());
+                mgr.onNodeFinished(msg.senderNodeId(), schemaError(msg), 
msg.nop());
 
                 it.remove();
             }
@@ -3990,50 +3988,11 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
     }
 
     /**
-     * Marshal schema error.
-     *
-     * @param err Error.
-     * @return Error bytes.
-     */
-    @Nullable private byte[] marshalSchemaError(UUID opId, @Nullable 
SchemaOperationException err) {
-        if (err == null)
-            return null;
-
-        try {
-            return U.marshal(marsh, err);
-        }
-        catch (Exception e) {
-            U.warn(log, "Failed to marshal schema operation error [opId=" + 
opId + ", err=" + err + ']', e);
-
-            try {
-                return U.marshal(marsh, new 
SchemaOperationException("Operation failed, but error cannot be " +
-                    "serialized (see local node log for more details) [opId=" 
+ opId + ", nodeId=" +
-                    ctx.localNodeId() + ']'));
-            }
-            catch (Exception e0) {
-                assert false; // Impossible situation.
-
-                return null;
-            }
-        }
-    }
-
-    /**
-     * Unmarshal schema error.
-     *
-     * @param errBytes Error bytes.
-     * @return Error.
+     * @param msg Status message.
+     * @return SchemaOperationException or null.
      */
-    @Nullable private SchemaOperationException unmarshalSchemaError(@Nullable 
byte[] errBytes) {
-        if (errBytes == null)
-            return null;
-
-        try {
-            return U.unmarshal(marsh, errBytes, 
U.resolveClassLoader(ctx.config()));
-        }
-        catch (Exception e) {
-            return new SchemaOperationException("Operation failed, but error 
cannot be deserialized.");
-        }
+    @Nullable private SchemaOperationException 
schemaError(SchemaOperationStatusMessage msg) {
+        return msg.errorMessage() != null ? new 
SchemaOperationException(msg.errorMessage(), msg.errorCode()) : null;
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationException.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationException.java
index bb6eb883dc5..02b4282afed 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationException.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationException.java
@@ -78,6 +78,18 @@ public class SchemaOperationException extends 
IgniteCheckedException {
         this.code = code;
     }
 
+    /**
+     * Constructor for specific error type.
+     *
+     * @param msg Message.
+     * @param code Code.
+     */
+    public SchemaOperationException(String msg, int code) {
+        super(msg);
+
+        this.code = code;
+    }
+
     /**
      * Constructor for generic error.
      *
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaOperationStatusMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaOperationStatusMessage.java
index b6671588bf8..5eab237e33f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaOperationStatusMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaOperationStatusMessage.java
@@ -17,32 +17,33 @@
 
 package org.apache.ignite.internal.processors.query.schema.message;
 
-import java.nio.ByteBuffer;
 import java.util.UUID;
-import org.apache.ignite.internal.GridDirectTransient;
-import org.apache.ignite.internal.IgniteCodeGeneratingFail;
+import org.apache.ignite.internal.Order;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Schema operation status message.
  */
-@IgniteCodeGeneratingFail
 public class SchemaOperationStatusMessage implements Message {
     /** Operation ID. */
+    @Order(value = 0, method = "operationId")
     private UUID opId;
 
-    /** Error bytes (if any). */
-    private byte[] errBytes;
+    /** Error code. */
+    @Order(value = 1, method = "errorCode")
+    private int errCode;
+
+    /** Error message. */
+    @Order(value = 2, method = "errorMessage")
+    private String errMsg;
 
     /** Sender node ID. */
-    @GridDirectTransient
     private UUID sndNodeId;
 
     /** No-op flag. */
+    @Order(3)
     private boolean nop;
 
     /**
@@ -56,12 +57,14 @@ public class SchemaOperationStatusMessage implements 
Message {
      * Constructor.
      *
      * @param opId Operation ID.
-     * @param errBytes Error bytes.
+     * @param errCode Error code.
+     * @param errMsg Error message.
      * @param nop No-op flag.
      */
-    public SchemaOperationStatusMessage(UUID opId, byte[] errBytes, boolean 
nop) {
+    public SchemaOperationStatusMessage(UUID opId, int errCode, @Nullable 
String errMsg, boolean nop) {
         this.opId = opId;
-        this.errBytes = errBytes;
+        this.errCode = errCode;
+        this.errMsg = errMsg;
         this.nop = nop;
     }
 
@@ -73,91 +76,52 @@ public class SchemaOperationStatusMessage implements 
Message {
     }
 
     /**
-     * @return Error bytes.
+     * @param opId Operation ID.
      */
-    @Nullable public byte[] errorBytes() {
-        return errBytes;
+    public void operationId(UUID opId) {
+        this.opId = opId;
     }
 
     /**
-     * @return Sender node ID.
+     * @return Error code.
      */
-    public UUID senderNodeId() {
-        return sndNodeId;
+    public int errorCode() {
+        return errCode;
     }
 
     /**
-     * @param sndNodeId Sender node ID.
+     * @param errCode Error code.
      */
-    public void senderNodeId(UUID sndNodeId) {
-        this.sndNodeId = sndNodeId;
+    public void errorCode(int errCode) {
+        this.errCode = errCode;
     }
 
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 0:
-                if (!writer.writeUuid(opId))
-                    return false;
-
-                writer.incrementState();
-
-            case 1:
-                if (!writer.writeByteArray(errBytes))
-                    return false;
-
-                writer.incrementState();
-
-            case 2:
-                if (!writer.writeBoolean(nop))
-                    return false;
-
-                writer.incrementState();
-        }
-
-        return true;
+    /**
+     * @return Error message.
+     */
+    @Nullable public String errorMessage() {
+        return errMsg;
     }
 
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        switch (reader.state()) {
-            case 0:
-                opId = reader.readUuid();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 1:
-                errBytes = reader.readByteArray();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 2:
-                nop = reader.readBoolean();
-
-                if (!reader.isLastRead())
-                    return false;
+    /**
+     * @param errMsg Error message.
+     */
+    public void errorMessage(String errMsg) {
+        this.errMsg = errMsg;
+    }
 
-                reader.incrementState();
-        }
+    /**
+     * @return Sender node ID.
+     */
+    public UUID senderNodeId() {
+        return sndNodeId;
+    }
 
-        return true;
+    /**
+     * @param sndNodeId Sender node ID.
+     */
+    public void senderNodeId(UUID sndNodeId) {
+        this.sndNodeId = sndNodeId;
     }
 
     /**
@@ -167,6 +131,13 @@ public class SchemaOperationStatusMessage implements 
Message {
         return nop;
     }
 
+    /**
+     * @param nop <code>True</code> if message is no-op.
+     */
+    public void nop(boolean nop) {
+        this.nop = nop;
+    }
+
     /** {@inheritDoc} */
     @Override public short directType() {
         return -53;

Reply via email to