This is an automated email from the ASF dual-hosted git repository.
timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 1d800f111f8 IGNITE-25912 Use MessageSerializer for
SchemaOperationStatusMessage (#12300)
1d800f111f8 is described below
commit 1d800f111f87dd0cce14239da1a1ad665f88c463
Author: Dmitry Werner <[email protected]>
AuthorDate: Mon Sep 1 16:28:12 2025 +0500
IGNITE-25912 Use MessageSerializer for SchemaOperationStatusMessage (#12300)
---
.../communication/GridIoMessageFactory.java | 3 +-
.../processors/query/GridQueryProcessor.java | 67 ++--------
.../query/schema/SchemaOperationException.java | 12 ++
.../message/SchemaOperationStatusMessage.java | 135 ++++++++-------------
4 files changed, 80 insertions(+), 137 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 9746ed087bb..f7a7e818fc5 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -51,6 +51,7 @@ import
org.apache.ignite.internal.codegen.MetadataRequestMessageSerializer;
import
org.apache.ignite.internal.codegen.MissingMappingRequestMessageSerializer;
import
org.apache.ignite.internal.codegen.MissingMappingResponseMessageSerializer;
import org.apache.ignite.internal.codegen.NearCacheUpdatesSerializer;
+import
org.apache.ignite.internal.codegen.SchemaOperationStatusMessageSerializer;
import org.apache.ignite.internal.codegen.SessionChannelMessageSerializer;
import
org.apache.ignite.internal.codegen.TcpInverseConnectionResponseMessageSerializer;
import org.apache.ignite.internal.codegen.TxLockSerializer;
@@ -212,7 +213,7 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
// -54 is reserved for SQL.
// -46 ... -51 - snapshot messages.
factory.register((short)-61, IgniteDiagnosticMessage::new);
- factory.register((short)-53, SchemaOperationStatusMessage::new);
+ factory.register((short)-53, SchemaOperationStatusMessage::new, new
SchemaOperationStatusMessageSerializer());
factory.register((short)-52, GridIntList::new, new
GridIntListSerializer());
factory.register((short)-51, NearCacheUpdates::new, new
NearCacheUpdatesSerializer());
factory.register((short)-50, GridNearAtomicCheckUpdateRequest::new);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 77ff80730f6..b0cc3cc5964 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -152,7 +152,6 @@ import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.session.SessionContext;
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
import org.apache.ignite.spi.indexing.IndexingQueryFilter;
@@ -206,9 +205,6 @@ public class GridQueryProcessor extends
GridProcessorAdapter {
/** For tests. */
public static Class<? extends GridQueryIndexing> idxCls;
- /** JDK marshaller to serialize errors. */
- private final JdkMarshaller marsh;
-
/** */
private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
@@ -360,7 +356,6 @@ public class GridQueryProcessor extends
GridProcessorAdapter {
idxBuildStatusStorage = new IndexBuildStatusStorage(ctx);
txAwareQueriesEnabled = U.isTxAwareQueriesEnabled(ctx);
- marsh = ctx.marshallerContext().jdkMarshaller();
}
/** {@inheritDoc} */
@@ -3903,15 +3898,18 @@ public class GridQueryProcessor extends
GridProcessorAdapter {
* @param err Error.
* @param nop No-op flag.
*/
- public void sendStatusMessage(UUID destNodeId, UUID opId,
SchemaOperationException err, boolean nop) {
+ public void sendStatusMessage(UUID destNodeId, UUID opId, @Nullable
SchemaOperationException err, boolean nop) {
if (log.isDebugEnabled())
log.debug("Sending schema operation status message [opId=" + opId
+ ", crdNode=" + destNodeId +
", err=" + err + ", nop=" + nop + ']');
try {
- byte[] errBytes = marshalSchemaError(opId, err);
-
- SchemaOperationStatusMessage msg = new
SchemaOperationStatusMessage(opId, errBytes, nop);
+ SchemaOperationStatusMessage msg = new
SchemaOperationStatusMessage(
+ opId,
+ err != null ? err.code() : -1,
+ err != null ? err.getMessage() : null,
+ nop
+ );
// Messages must go to dedicated schema pool. We cannot push them
to query pool because in this case
// they could be blocked with other query requests.
@@ -3952,7 +3950,7 @@ public class GridQueryProcessor extends
GridProcessorAdapter {
log.debug("Received status message [opId=" +
msg.operationId() +
", sndNodeId=" + msg.senderNodeId() + ']');
- op.manager().onNodeFinished(msg.senderNodeId(),
unmarshalSchemaError(msg.errorBytes()), msg.nop());
+ op.manager().onNodeFinished(msg.senderNodeId(),
schemaError(msg), msg.nop());
return;
}
@@ -3982,7 +3980,7 @@ public class GridQueryProcessor extends
GridProcessorAdapter {
SchemaOperationStatusMessage msg = it.next();
if (Objects.equals(msg.operationId(), opId)) {
- mgr.onNodeFinished(msg.senderNodeId(),
unmarshalSchemaError(msg.errorBytes()), msg.nop());
+ mgr.onNodeFinished(msg.senderNodeId(), schemaError(msg),
msg.nop());
it.remove();
}
@@ -3990,50 +3988,11 @@ public class GridQueryProcessor extends
GridProcessorAdapter {
}
/**
- * Marshal schema error.
- *
- * @param err Error.
- * @return Error bytes.
- */
- @Nullable private byte[] marshalSchemaError(UUID opId, @Nullable
SchemaOperationException err) {
- if (err == null)
- return null;
-
- try {
- return U.marshal(marsh, err);
- }
- catch (Exception e) {
- U.warn(log, "Failed to marshal schema operation error [opId=" +
opId + ", err=" + err + ']', e);
-
- try {
- return U.marshal(marsh, new
SchemaOperationException("Operation failed, but error cannot be " +
- "serialized (see local node log for more details) [opId="
+ opId + ", nodeId=" +
- ctx.localNodeId() + ']'));
- }
- catch (Exception e0) {
- assert false; // Impossible situation.
-
- return null;
- }
- }
- }
-
- /**
- * Unmarshal schema error.
- *
- * @param errBytes Error bytes.
- * @return Error.
+ * @param msg Status message.
+ * @return SchemaOperationException or null.
*/
- @Nullable private SchemaOperationException unmarshalSchemaError(@Nullable
byte[] errBytes) {
- if (errBytes == null)
- return null;
-
- try {
- return U.unmarshal(marsh, errBytes,
U.resolveClassLoader(ctx.config()));
- }
- catch (Exception e) {
- return new SchemaOperationException("Operation failed, but error
cannot be deserialized.");
- }
+ @Nullable private SchemaOperationException
schemaError(SchemaOperationStatusMessage msg) {
+ return msg.errorMessage() != null ? new
SchemaOperationException(msg.errorMessage(), msg.errorCode()) : null;
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationException.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationException.java
index bb6eb883dc5..02b4282afed 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationException.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationException.java
@@ -78,6 +78,18 @@ public class SchemaOperationException extends
IgniteCheckedException {
this.code = code;
}
+ /**
+ * Constructor for specific error type.
+ *
+ * @param msg Message.
+ * @param code Code.
+ */
+ public SchemaOperationException(String msg, int code) {
+ super(msg);
+
+ this.code = code;
+ }
+
/**
* Constructor for generic error.
*
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaOperationStatusMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaOperationStatusMessage.java
index b6671588bf8..5eab237e33f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaOperationStatusMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaOperationStatusMessage.java
@@ -17,32 +17,33 @@
package org.apache.ignite.internal.processors.query.schema.message;
-import java.nio.ByteBuffer;
import java.util.UUID;
-import org.apache.ignite.internal.GridDirectTransient;
-import org.apache.ignite.internal.IgniteCodeGeneratingFail;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
/**
* Schema operation status message.
*/
-@IgniteCodeGeneratingFail
public class SchemaOperationStatusMessage implements Message {
/** Operation ID. */
+ @Order(value = 0, method = "operationId")
private UUID opId;
- /** Error bytes (if any). */
- private byte[] errBytes;
+ /** Error code. */
+ @Order(value = 1, method = "errorCode")
+ private int errCode;
+
+ /** Error message. */
+ @Order(value = 2, method = "errorMessage")
+ private String errMsg;
/** Sender node ID. */
- @GridDirectTransient
private UUID sndNodeId;
/** No-op flag. */
+ @Order(3)
private boolean nop;
/**
@@ -56,12 +57,14 @@ public class SchemaOperationStatusMessage implements
Message {
* Constructor.
*
* @param opId Operation ID.
- * @param errBytes Error bytes.
+ * @param errCode Error code.
+ * @param errMsg Error message.
* @param nop No-op flag.
*/
- public SchemaOperationStatusMessage(UUID opId, byte[] errBytes, boolean
nop) {
+ public SchemaOperationStatusMessage(UUID opId, int errCode, @Nullable
String errMsg, boolean nop) {
this.opId = opId;
- this.errBytes = errBytes;
+ this.errCode = errCode;
+ this.errMsg = errMsg;
this.nop = nop;
}
@@ -73,91 +76,52 @@ public class SchemaOperationStatusMessage implements
Message {
}
/**
- * @return Error bytes.
+ * @param opId Operation ID.
*/
- @Nullable public byte[] errorBytes() {
- return errBytes;
+ public void operationId(UUID opId) {
+ this.opId = opId;
}
/**
- * @return Sender node ID.
+ * @return Error code.
*/
- public UUID senderNodeId() {
- return sndNodeId;
+ public int errorCode() {
+ return errCode;
}
/**
- * @param sndNodeId Sender node ID.
+ * @param errCode Error code.
*/
- public void senderNodeId(UUID sndNodeId) {
- this.sndNodeId = sndNodeId;
+ public void errorCode(int errCode) {
+ this.errCode = errCode;
}
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 0:
- if (!writer.writeUuid(opId))
- return false;
-
- writer.incrementState();
-
- case 1:
- if (!writer.writeByteArray(errBytes))
- return false;
-
- writer.incrementState();
-
- case 2:
- if (!writer.writeBoolean(nop))
- return false;
-
- writer.incrementState();
- }
-
- return true;
+ /**
+ * @return Error message.
+ */
+ @Nullable public String errorMessage() {
+ return errMsg;
}
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- switch (reader.state()) {
- case 0:
- opId = reader.readUuid();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 1:
- errBytes = reader.readByteArray();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 2:
- nop = reader.readBoolean();
-
- if (!reader.isLastRead())
- return false;
+ /**
+ * @param errMsg Error message.
+ */
+ public void errorMessage(String errMsg) {
+ this.errMsg = errMsg;
+ }
- reader.incrementState();
- }
+ /**
+ * @return Sender node ID.
+ */
+ public UUID senderNodeId() {
+ return sndNodeId;
+ }
- return true;
+ /**
+ * @param sndNodeId Sender node ID.
+ */
+ public void senderNodeId(UUID sndNodeId) {
+ this.sndNodeId = sndNodeId;
}
/**
@@ -167,6 +131,13 @@ public class SchemaOperationStatusMessage implements
Message {
return nop;
}
+ /**
+ * @param nop <code>True</code> if message is no-op.
+ */
+ public void nop(boolean nop) {
+ this.nop = nop;
+ }
+
/** {@inheritDoc} */
@Override public short directType() {
return -53;