This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new c621b7db181 IGNITE-28523 : Get rid of error bytes in
DataStreamerResponse (#13006)
c621b7db181 is described below
commit c621b7db1817228baf41d299761474e4b086af5f
Author: Vladimir Steshin <[email protected]>
AuthorDate: Mon Apr 13 15:15:26 2026 +0300
IGNITE-28523 : Get rid of error bytes in DataStreamerResponse (#13006)
---
.../ignite/internal/CoreMessagesProvider.java | 2 +-
.../datastreamer/DataStreamProcessor.java | 8 ----
.../processors/datastreamer/DataStreamerImpl.java | 34 ++++---------
.../datastreamer/DataStreamerResponse.java | 55 +++-------------------
4 files changed, 17 insertions(+), 82 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
index 441b53aaacd..2996be0a3a4 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
@@ -331,7 +331,7 @@ public class CoreMessagesProvider implements
MessageFactoryProvider {
msgIdx = 5000;
// We don't use the code‑generated serializer for CompressedMessage -
serialization is highly customized.
factory.register(msgIdx++, CompressedMessage::new);
- withNoSchema(ErrorMessage.class);
+ withSchema(ErrorMessage.class);
withNoSchema(InetSocketAddressMessage.class);
withNoSchema(InetAddressMessage.class);
withNoSchema(TcpDiscoveryNode.class);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index 4e237b12a26..9cdb2f424a3 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -72,9 +72,6 @@ public class DataStreamProcessor<K, V> extends
GridProcessorAdapter {
/** Marshaller. */
private final Marshaller marsh;
- /** */
- private byte[] marshErrBytes;
-
/**
* @param ctx Kernal context.
*/
@@ -96,9 +93,6 @@ public class DataStreamProcessor<K, V> extends
GridProcessorAdapter {
/** {@inheritDoc} */
@Override public void start() throws IgniteCheckedException {
- marshErrBytes = U.marshal(marsh, new IgniteCheckedException("Failed to
marshal response error, " +
- "see node log for details."));
-
flusher = U.newThread(new GridWorker(ctx.igniteInstanceName(),
"grid-data-loader-flusher", log) {
@Override protected void body() throws InterruptedException {
while (!isCancelled()) {
@@ -415,8 +409,6 @@ public class DataStreamProcessor<K, V> extends
GridProcessorAdapter {
private void sendResponse(UUID nodeId, Object resTopic, long reqId,
@Nullable Throwable err) {
DataStreamerResponse res = new DataStreamerResponse(reqId, err);
- res.prepareMarshal(marsh, log, marshErrBytes);
-
try {
ctx.io().sendToCustomTopic(nodeId, resTopic, res,
threadIoPolicy());
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 3eda0cb9707..a86b6240ed0 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -2091,33 +2091,17 @@ public class DataStreamerImpl<K, V> implements
IgniteDataStreamer<K, V>, Delayed
return;
}
- Throwable err = null;
+ Throwable err = res.error();
- byte[] errBytes = res.errorBytes();
+ if (err != null) {
+ final String msg = "DataStreamer request failed [node=" +
nodeId + "]";
- if (errBytes != null) {
- try {
- GridPeerDeployAware jobPda0 = jobPda;
-
- res.finishUnmarshal(ctx.marshaller(),
- U.resolveClassLoader(jobPda0 != null ?
jobPda0.classLoader() : null, ctx.config()));
-
- final Throwable cause = res.error();
-
- final String msg = "DataStreamer request failed [node=" +
nodeId + "]";
-
- if (cause instanceof ClusterTopologyCheckedException)
- err = new ClusterTopologyCheckedException(msg, cause);
- else if (X.hasCause(cause,
IgniteClusterReadOnlyException.class))
- err = new IgniteClusterReadOnlyException(msg, cause);
- else
- err = new IgniteCheckedException(msg, cause);
- }
- catch (IgniteCheckedException e) {
- f.onDone(null, new IgniteCheckedException("Failed to
unmarshal response.", e));
-
- return;
- }
+ if (err instanceof ClusterTopologyCheckedException)
+ err = new ClusterTopologyCheckedException(msg, err);
+ else if (X.hasCause(err, IgniteClusterReadOnlyException.class))
+ err = new IgniteClusterReadOnlyException(msg, err);
+ else
+ err = new IgniteCheckedException(msg, err);
}
f.onDone(null, err);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerResponse.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerResponse.java
index 1638f4f3691..1704e73330f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerResponse.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerResponse.java
@@ -17,12 +17,9 @@
package org.apache.ignite.internal.processors.datastreamer;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.managers.communication.ErrorMessage;
import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.Nullable;
@@ -34,12 +31,9 @@ public class DataStreamerResponse implements Message {
@Order(0)
long reqId;
- /** */
- private @Nullable Throwable err;
-
/** */
@Order(1)
- @Nullable byte[] errBytes;
+ @Nullable ErrorMessage errMsg;
/**
* @param reqId Request ID.
@@ -47,7 +41,9 @@ public class DataStreamerResponse implements Message {
*/
public DataStreamerResponse(long reqId, @Nullable Throwable err) {
this.reqId = reqId;
- this.err = err;
+
+ if (err != null)
+ errMsg = new ErrorMessage(err);
}
/**
@@ -64,48 +60,11 @@ public class DataStreamerResponse implements Message {
return reqId;
}
- /**
- * @return Error bytes.
- */
- public @Nullable byte[] errorBytes() {
- return errBytes;
- }
-
/**
* @return Error.
*/
- public Throwable error() {
- return err;
- }
-
- /**
- * @param marsh Marshaller.
- * @param log Logger.
- * @param marshErrBytes Marshalled error bytes.
- */
- public void prepareMarshal(Marshaller marsh, IgniteLogger log, byte[]
marshErrBytes) {
- if (err != null && errBytes == null) {
- try {
- errBytes = U.marshal(marsh, err);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to marshal error [err=" + err + ",
marshErr=" + e + ']', e);
-
- errBytes = marshErrBytes;
- }
- }
- }
-
- /**
- * @param marsh Marshaller.
- * @param ldr Class loader.
- */
- public void finishUnmarshal(Marshaller marsh, ClassLoader ldr) throws
IgniteCheckedException {
- if (errBytes != null && err == null) {
- err = U.unmarshal(marsh, errBytes, ldr);
-
- errBytes = null;
- }
+ public @Nullable Throwable error() {
+ return ErrorMessage.error(errMsg);
}
/** {@inheritDoc} */