This is an automated email from the ASF dual-hosted git repository.

sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new c621b7db181 IGNITE-28523 : Get rid of error bytes in 
DataStreamerResponse (#13006)
c621b7db181 is described below

commit c621b7db1817228baf41d299761474e4b086af5f
Author: Vladimir Steshin <[email protected]>
AuthorDate: Mon Apr 13 15:15:26 2026 +0300

    IGNITE-28523 : Get rid of error bytes in DataStreamerResponse (#13006)
---
 .../ignite/internal/CoreMessagesProvider.java      |  2 +-
 .../datastreamer/DataStreamProcessor.java          |  8 ----
 .../processors/datastreamer/DataStreamerImpl.java  | 34 ++++---------
 .../datastreamer/DataStreamerResponse.java         | 55 +++-------------------
 4 files changed, 17 insertions(+), 82 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
index 441b53aaacd..2996be0a3a4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
@@ -331,7 +331,7 @@ public class CoreMessagesProvider implements 
MessageFactoryProvider {
         msgIdx = 5000;
         // We don't use the code‑generated serializer for CompressedMessage - 
serialization is highly customized.
         factory.register(msgIdx++, CompressedMessage::new);
-        withNoSchema(ErrorMessage.class);
+        withSchema(ErrorMessage.class);
         withNoSchema(InetSocketAddressMessage.class);
         withNoSchema(InetAddressMessage.class);
         withNoSchema(TcpDiscoveryNode.class);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index 4e237b12a26..9cdb2f424a3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -72,9 +72,6 @@ public class DataStreamProcessor<K, V> extends 
GridProcessorAdapter {
     /** Marshaller. */
     private final Marshaller marsh;
 
-    /** */
-    private byte[] marshErrBytes;
-
     /**
      * @param ctx Kernal context.
      */
@@ -96,9 +93,6 @@ public class DataStreamProcessor<K, V> extends 
GridProcessorAdapter {
 
     /** {@inheritDoc} */
     @Override public void start() throws IgniteCheckedException {
-        marshErrBytes = U.marshal(marsh, new IgniteCheckedException("Failed to 
marshal response error, " +
-            "see node log for details."));
-
         flusher = U.newThread(new GridWorker(ctx.igniteInstanceName(), 
"grid-data-loader-flusher", log) {
             @Override protected void body() throws InterruptedException {
                 while (!isCancelled()) {
@@ -415,8 +409,6 @@ public class DataStreamProcessor<K, V> extends 
GridProcessorAdapter {
     private void sendResponse(UUID nodeId, Object resTopic, long reqId, 
@Nullable Throwable err) {
         DataStreamerResponse res = new DataStreamerResponse(reqId, err);
 
-        res.prepareMarshal(marsh, log, marshErrBytes);
-
         try {
             ctx.io().sendToCustomTopic(nodeId, resTopic, res, 
threadIoPolicy());
         }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 3eda0cb9707..a86b6240ed0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -2091,33 +2091,17 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
                 return;
             }
 
-            Throwable err = null;
+            Throwable err = res.error();
 
-            byte[] errBytes = res.errorBytes();
+            if (err != null) {
+                final String msg = "DataStreamer request failed [node=" + 
nodeId + "]";
 
-            if (errBytes != null) {
-                try {
-                    GridPeerDeployAware jobPda0 = jobPda;
-
-                    res.finishUnmarshal(ctx.marshaller(),
-                        U.resolveClassLoader(jobPda0 != null ? 
jobPda0.classLoader() : null, ctx.config()));
-
-                    final Throwable cause = res.error();
-
-                    final String msg = "DataStreamer request failed [node=" + 
nodeId + "]";
-
-                    if (cause instanceof ClusterTopologyCheckedException)
-                        err = new ClusterTopologyCheckedException(msg, cause);
-                    else if (X.hasCause(cause, 
IgniteClusterReadOnlyException.class))
-                        err = new IgniteClusterReadOnlyException(msg, cause);
-                    else
-                        err = new IgniteCheckedException(msg, cause);
-                }
-                catch (IgniteCheckedException e) {
-                    f.onDone(null, new IgniteCheckedException("Failed to 
unmarshal response.", e));
-
-                    return;
-                }
+                if (err instanceof ClusterTopologyCheckedException)
+                    err = new ClusterTopologyCheckedException(msg, err);
+                else if (X.hasCause(err, IgniteClusterReadOnlyException.class))
+                    err = new IgniteClusterReadOnlyException(msg, err);
+                else
+                    err = new IgniteCheckedException(msg, err);
             }
 
             f.onDone(null, err);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerResponse.java
index 1638f4f3691..1704e73330f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerResponse.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerResponse.java
@@ -17,12 +17,9 @@
 
 package org.apache.ignite.internal.processors.datastreamer;
 
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.managers.communication.ErrorMessage;
 import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.jetbrains.annotations.Nullable;
 
@@ -34,12 +31,9 @@ public class DataStreamerResponse implements Message {
     @Order(0)
     long reqId;
 
-    /** */
-    private @Nullable Throwable err;
-
     /** */
     @Order(1)
-    @Nullable byte[] errBytes;
+    @Nullable ErrorMessage errMsg;
 
     /**
      * @param reqId Request ID.
@@ -47,7 +41,9 @@ public class DataStreamerResponse implements Message {
      */
     public DataStreamerResponse(long reqId, @Nullable Throwable err) {
         this.reqId = reqId;
-        this.err = err;
+
+        if (err != null)
+            errMsg = new ErrorMessage(err);
     }
 
     /**
@@ -64,48 +60,11 @@ public class DataStreamerResponse implements Message {
         return reqId;
     }
 
-    /**
-     * @return Error bytes.
-     */
-    public @Nullable byte[] errorBytes() {
-        return errBytes;
-    }
-
     /**
      * @return Error.
      */
-    public Throwable error() {
-        return err;
-    }
-
-    /**
-     * @param marsh Marshaller.
-     * @param log Logger.
-     * @param marshErrBytes Marshalled error bytes.
-     */
-    public void prepareMarshal(Marshaller marsh, IgniteLogger log, byte[] 
marshErrBytes) {
-        if (err != null && errBytes == null) {
-            try {
-                errBytes = U.marshal(marsh, err);
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to marshal error [err=" + err + ", 
marshErr=" + e + ']', e);
-
-                errBytes = marshErrBytes;
-            }
-        }
-    }
-
-    /**
-     * @param marsh Marshaller.
-     * @param ldr Class loader.
-     */
-    public void finishUnmarshal(Marshaller marsh, ClassLoader ldr) throws 
IgniteCheckedException {
-        if (errBytes != null && err == null) {
-            err = U.unmarshal(marsh, errBytes, ldr);
-
-            errBytes = null;
-        }
+    public @Nullable Throwable error() {
+        return ErrorMessage.error(errMsg);
     }
 
     /** {@inheritDoc} */

Reply via email to