This is an automated email from the ASF dual-hosted git repository.

zstan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 59107180be IGNITE-19788 Sql. Change QueryBatchMessage serialization 
(#2476)
59107180be is described below

commit 59107180be987aa0a4f56ea7bf5d4688756947fb
Author: Evgeniy Stanilovskiy <stanilov...@gmail.com>
AuthorDate: Mon Aug 28 14:26:40 2023 +0300

    IGNITE-19788 Sql. Change QueryBatchMessage serialization (#2476)
---
 .../src/integrationTest/sql/sqlite/join/join1.test |  4 +-
 .../internal/sql/engine/exec/ArrayRowHandler.java  | 14 +++++++
 .../internal/sql/engine/exec/ExchangeService.java  |  6 +--
 .../sql/engine/exec/ExchangeServiceImpl.java       | 11 +++---
 .../sql/engine/exec/LogicalRelImplementor.java     |  8 +++-
 .../internal/sql/engine/exec/RowHandler.java       | 43 +++++++++++++++++++++-
 .../sql/engine/exec/exp/ExpressionFactoryImpl.java |  4 +-
 .../engine/exec/exp/agg/AccumulatorsFactory.java   |  2 +-
 .../ignite/internal/sql/engine/exec/rel/Inbox.java | 16 +++++++-
 .../internal/sql/engine/exec/rel/Outbox.java       | 12 +++++-
 .../internal/sql/engine/externalize/RelJson.java   |  4 +-
 .../sql/engine/message/QueryBatchMessage.java      |  5 +--
 .../sql/engine/exec/rel/AbstractExecutionTest.java |  7 ++++
 .../sql/engine/exec/rel/ExchangeExecutionTest.java |  3 +-
 .../sql/engine/exec/rel/ExecutionTest.java         | 27 ++++++++++++++
 15 files changed, 140 insertions(+), 26 deletions(-)

diff --git a/modules/runner/src/integrationTest/sql/sqlite/join/join1.test 
b/modules/runner/src/integrationTest/sql/sqlite/join/join1.test
index 51d0795f61..7b6c043d59 100644
--- a/modules/runner/src/integrationTest/sql/sqlite/join/join1.test
+++ b/modules/runner/src/integrationTest/sql/sqlite/join/join1.test
@@ -145,12 +145,10 @@ SELECT * FROM t1 INNER JOIN t2 USING(b,c) ORDER BY t1.a;
 
 statement error
 SELECT * FROM t1 NATURAL JOIN t2 ON t1.a=t2.b;
-----
 
 statement error
 SELECT * FROM t1 JOIN t2 USING(a);
-----
 
 statement error
 SELECT * FROM t1 INNER OUTER JOIN t2;
-----
+
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ArrayRowHandler.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ArrayRowHandler.java
index e732aa704d..13d4ac5bd2 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ArrayRowHandler.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ArrayRowHandler.java
@@ -17,9 +17,11 @@
 
 package org.apache.ignite.internal.sql.engine.exec;
 
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
 import org.apache.ignite.internal.util.ArrayUtils;
+import org.apache.ignite.internal.util.ByteUtils;
 
 /**
  * Handler for rows that implemented as a simple objects array.
@@ -54,6 +56,12 @@ public class ArrayRowHandler implements RowHandler<Object[]> 
{
         return row.length;
     }
 
+    @Override
+    public ByteBuffer toByteBuffer(Object[] row) {
+        byte[] raw = ByteUtils.toBytes(row);
+        return ByteBuffer.wrap(raw);
+    }
+
     /** {@inheritDoc} */
     @Override
     public String toString(Object[] objects) {
@@ -85,6 +93,12 @@ public class ArrayRowHandler implements RowHandler<Object[]> 
{
 
                 return fields;
             }
+
+            /** {@inheritDoc} */
+            @Override
+            public Object[] create(ByteBuffer raw) {
+                return ByteUtils.fromBytes(raw.array());
+            }
         };
     }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeService.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeService.java
index e038afd632..4211794044 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeService.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeService.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.sql.engine.exec;
 
+import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -43,12 +44,11 @@ public interface ExchangeService extends LifecycleAware {
      * @param batchId The ID of the batch to which the data belongs.
      * @param last Indicates whether this is the last batch of data to be sent.
      * @param rows The data to be sent.
-     * @param <RowT> The type of the rows int the batch.
      * @return A {@link CompletableFuture future} representing the result of 
operation,
      *      which completes when the data has been sent.
      */
-    <RowT> CompletableFuture<Void> sendBatch(String nodeName, UUID queryId, 
long fragmentId, long exchangeId, int batchId, boolean last,
-            List<RowT> rows);
+    CompletableFuture<Void> sendBatch(String nodeName, UUID queryId, long 
fragmentId, long exchangeId, int batchId, boolean last,
+            List<ByteBuffer> rows);
 
     /**
      * Asynchronously requests data from the specified node.
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
index c616389368..b601cb329b 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.sql.engine.exec;
 import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
 import static org.apache.ignite.lang.IgniteStringFormatter.format;
 
+import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -33,7 +34,6 @@ import 
org.apache.ignite.internal.sql.engine.message.QueryBatchMessage;
 import org.apache.ignite.internal.sql.engine.message.QueryBatchRequestMessage;
 import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup;
 import org.apache.ignite.internal.sql.engine.message.SqlQueryMessagesFactory;
-import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.TraceableException;
@@ -74,8 +74,9 @@ public class ExchangeServiceImpl implements ExchangeService {
 
     /** {@inheritDoc} */
     @Override
-    public <RowT> CompletableFuture<Void> sendBatch(String nodeName, UUID 
qryId, long fragmentId, long exchangeId, int batchId,
-            boolean last, List<RowT> rows) {
+    public CompletableFuture<Void> sendBatch(String nodeName, UUID qryId, long 
fragmentId, long exchangeId, int batchId,
+            boolean last, List<ByteBuffer> rows) {
+
         return messageService.send(
                 nodeName,
                 FACTORY.queryBatchMessage()
@@ -84,7 +85,7 @@ public class ExchangeServiceImpl implements ExchangeService {
                         .exchangeId(exchangeId)
                         .batchId(batchId)
                         .last(last)
-                        .rows(Commons.cast(rows))
+                        .rows(rows)
                         .build()
         );
     }
@@ -162,7 +163,7 @@ public class ExchangeServiceImpl implements ExchangeService 
{
 
         if (inbox != null) {
             try {
-                inbox.onBatchReceived(nodeName, msg.batchId(), msg.last(), 
Commons.cast(msg.rows()));
+                inbox.onBatchReceived(nodeName, msg.batchId(), msg.last(), 
msg.rows());
             } catch (Throwable e) {
                 inbox.onError(e);
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
index d894685874..eab01147f6 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
@@ -620,9 +620,15 @@ public class LogicalRelImplementor<RowT> implements 
IgniteRelVisitor<Node<RowT>>
     /** {@inheritDoc} */
     @Override
     public Node<RowT> visit(IgniteReceiver rel) {
+        RelDataType rowType = rel.getRowType();
+
+        RowSchema rowSchema = 
rowSchemaFromRelTypes(RelOptUtil.getFieldTypeList(rowType));
+
+        RowFactory<RowT> rowFactory = ctx.rowHandler().factory(rowSchema);
+
         Inbox<RowT> inbox = new Inbox<>(ctx, exchangeSvc, mailboxRegistry,
                 ctx.remotes(rel.exchangeId()), 
expressionFactory.comparator(rel.collation()),
-                rel.exchangeId(), rel.sourceFragmentId());
+                rowFactory, rel.exchangeId(), rel.sourceFragmentId());
 
         mailboxRegistry.register(inbox);
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RowHandler.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RowHandler.java
index de8d4a7b2b..28ccfafe8b 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RowHandler.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RowHandler.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.sql.engine.exec;
 
+import java.nio.ByteBuffer;
 import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
 import org.jetbrains.annotations.Nullable;
 
@@ -24,29 +25,67 @@ import org.jetbrains.annotations.Nullable;
  * Universal accessor and mutator for rows. It also has factory methods.
  */
 public interface RowHandler<RowT> {
+    /**
+     * Extract appropriate field.
+     *
+     * @param field Field position to be processed.
+     * @param row Object to be extracted from.
+     */
     @Nullable Object get(int field, RowT row);
 
+    /** Set incoming row field.
+     *
+     * @param field Field position to be processed.
+     * @param row Row which field need to be changed.
+     * @param val Value which should be set.
+     */
     void set(int field, RowT row, @Nullable Object val);
 
+    /** Concatenate two rows. */
     RowT concat(RowT left, RowT right);
 
+    /** Return column count contained in the incoming row. */
     int columnCount(RowT row);
 
+    /**
+     * Assembly row representation as ByteBuffer.
+     *
+     * @param row Incoming data to be processed.
+     * @return {@link ByteBuffer} representation.
+     */
+    ByteBuffer toByteBuffer(RowT row);
+
+    /** String representation. */
     String toString(RowT row);
 
     /** Creates a factory that produces rows with fields defined by the given 
schema. */
     RowFactory<RowT> factory(RowSchema rowSchema);
 
     /**
-     * RowFactory interface.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     * Provide methods for inner row assembly.
      */
     @SuppressWarnings("PublicInnerClass")
     interface RowFactory<RowT> {
+        /** Return row accessor and mutator implementation. */
         RowHandler<RowT> handler();
 
+        /** Create empty row. */
         RowT create();
 
+        /**
+         * Create row using incoming objects.
+         *
+         * @param fields Sequential objects definitions output row will be 
created from.
+         * @return Instantiation defined representation.
+         */
         RowT create(Object... fields);
+
+        /**
+         * Create row using incoming {@link ByteBuffer}.
+         *
+         * @param raw {@link ByteBuffer} representation.
+         * @return Instantiation defined representation.
+         */
+        RowT create(ByteBuffer raw);
     }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactoryImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactoryImpl.java
index f1416671a2..ed38d522c3 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactoryImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/ExpressionFactoryImpl.java
@@ -646,7 +646,7 @@ public class ExpressionFactoryImpl<RowT> implements 
ExpressionFactory<RowT> {
         public boolean test(RowT r) {
             scalar.execute(ctx, r, out);
 
-            return Boolean.TRUE == hnd.get(0, out);
+            return Boolean.TRUE.equals(hnd.get(0, out));
         }
     }
 
@@ -662,7 +662,7 @@ public class ExpressionFactoryImpl<RowT> implements 
ExpressionFactory<RowT> {
         @Override
         public boolean test(RowT r1, RowT r2) {
             scalar.execute(ctx, r1, r2, out);
-            return Boolean.TRUE == hnd.get(0, out);
+            return Boolean.TRUE.equals(hnd.get(0, out));
         }
     }
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AccumulatorsFactory.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AccumulatorsFactory.java
index 3ab0b36f6d..c14fb4255d 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AccumulatorsFactory.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AccumulatorsFactory.java
@@ -280,7 +280,7 @@ public class AccumulatorsFactory<RowT> implements 
Supplier<List<AccumulatorWrapp
         /** {@inheritDoc} */
         @Override
         public void add(RowT row) {
-            if (type != AggregateType.REDUCE && filterArg >= 0 && Boolean.TRUE 
!= handler.get(filterArg, row)) {
+            if (type != AggregateType.REDUCE && filterArg >= 0 && 
!Boolean.TRUE.equals(handler.get(filterArg, row))) {
                 return;
             }
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
index 3fa7f8974a..992dd2e259 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.sql.engine.exec.rel;
 import static org.apache.calcite.util.Util.unexpected;
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
@@ -33,6 +34,7 @@ import 
org.apache.ignite.internal.sql.engine.NodeLeftException;
 import org.apache.ignite.internal.sql.engine.exec.ExchangeService;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
 import org.apache.ignite.internal.sql.engine.exec.MailboxRegistry;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
 import org.apache.ignite.internal.sql.engine.exec.SharedState;
 import org.apache.ignite.internal.sql.engine.exec.rel.Inbox.RemoteSource.State;
 import org.apache.ignite.internal.util.ExceptionUtils;
@@ -52,6 +54,7 @@ public class Inbox<RowT> extends AbstractNode<RowT> 
implements Mailbox<RowT>, Si
     private final Collection<String> srcNodeNames;
     private final @Nullable Comparator<RowT> comp;
     private final Map<String, RemoteSource<RowT>> perNodeBuffers;
+    private final RowFactory<RowT> rowFactory;
 
     private @Nullable List<RemoteSource<RowT>> remoteSources;
     private int requested;
@@ -63,6 +66,7 @@ public class Inbox<RowT> extends AbstractNode<RowT> 
implements Mailbox<RowT>, Si
      * @param ctx Execution context.
      * @param exchange Exchange service.
      * @param registry Mailbox registry.
+     * @param rowFactory Incoming row factory.
      * @param exchangeId Exchange ID.
      * @param srcFragmentId Source fragment ID.
      */
@@ -72,6 +76,7 @@ public class Inbox<RowT> extends AbstractNode<RowT> 
implements Mailbox<RowT>, Si
             MailboxRegistry registry,
             Collection<String> srcNodeNames,
             @Nullable Comparator<RowT> comp,
+            RowFactory<RowT> rowFactory,
             long exchangeId,
             long srcFragmentId
     ) {
@@ -83,6 +88,7 @@ public class Inbox<RowT> extends AbstractNode<RowT> 
implements Mailbox<RowT>, Si
         this.registry = registry;
         this.srcNodeNames = srcNodeNames;
         this.comp = comp;
+        this.rowFactory = rowFactory;
 
         this.srcFragmentId = srcFragmentId;
         this.exchangeId = exchangeId;
@@ -157,12 +163,18 @@ public class Inbox<RowT> extends AbstractNode<RowT> 
implements Mailbox<RowT>, Si
      * @param last Last batch flag.
      * @param rows Rows.
      */
-    public void onBatchReceived(String srcNodeName, int batchId, boolean last, 
List<RowT> rows) throws Exception {
+    public void onBatchReceived(String srcNodeName, int batchId, boolean last, 
List<ByteBuffer> rows) throws Exception {
         RemoteSource<RowT> source = perNodeBuffers.get(srcNodeName);
 
         boolean waitingBefore = source.check() == State.WAITING;
 
-        source.onBatchReceived(batchId, last, rows);
+        List<RowT> rows0 = new ArrayList<>(rows.size());
+
+        for (ByteBuffer row : rows) {
+            rows0.add(rowFactory.create(row));
+        }
+
+        source.onBatchReceived(batchId, last, rows0);
 
         if (requested > 0 && waitingBefore && source.check() != State.WAITING) 
{
             push();
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
index 387a25dbbc..2dc2b3e8f5 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.sql.engine.exec.rel;
 
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Deque;
@@ -32,6 +33,7 @@ import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.sql.engine.exec.ExchangeService;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
 import org.apache.ignite.internal.sql.engine.exec.MailboxRegistry;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler;
 import org.apache.ignite.internal.sql.engine.exec.SharedState;
 import org.apache.ignite.internal.sql.engine.trait.Destination;
 import org.apache.ignite.internal.sql.engine.util.Commons;
@@ -232,7 +234,15 @@ public class Outbox<RowT> extends AbstractNode<RowT> 
implements Mailbox<RowT>, S
     }
 
     private void sendBatch(String nodeName, int batchId, boolean last, 
List<RowT> rows) {
-        exchange.sendBatch(nodeName, queryId(), targetFragmentId, exchangeId, 
batchId, last, rows)
+        RowHandler<RowT> handler = context().rowHandler();
+
+        List<ByteBuffer> rows0 = new ArrayList<>(rows.size());
+
+        for (RowT row : rows) {
+            rows0.add(handler.toByteBuffer(row));
+        }
+
+        exchange.sendBatch(nodeName, queryId(), targetFragmentId, exchangeId, 
batchId, last, rows0)
                 .whenComplete((ignored, ex) -> {
                     if (ex == null) {
                         return;
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJson.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJson.java
index e9353b4b10..d755d215b5 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJson.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJson.java
@@ -705,7 +705,7 @@ class RelJson {
         } else if (o instanceof Map) {
             Map<String, Object> map = (Map<String, Object>) o;
             String clazz = (String) map.get("class");
-            boolean nullable = Boolean.TRUE == map.get("nullable");
+            boolean nullable = Boolean.TRUE.equals(map.get("nullable"));
 
             if (clazz != null) {
                 RelDataType type = 
typeFactory.createJavaType(classForName(clazz, false));
@@ -826,7 +826,7 @@ class RelJson {
                 // Check if it is a local ref.
                 if (map.containsKey("type")) {
                     RelDataType type = toType(typeFactory, map.get("type"));
-                    return map.get("dynamic") == Boolean.TRUE
+                    return Boolean.TRUE.equals(map.get("dynamic"))
                             ? rexBuilder.makeDynamicParam(type, input)
                             : rexBuilder.makeLocalRef(type, input);
                 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchMessage.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchMessage.java
index 058f52cf74..b1cc051270 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchMessage.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchMessage.java
@@ -17,8 +17,8 @@
 
 package org.apache.ignite.internal.sql.engine.message;
 
+import java.nio.ByteBuffer;
 import java.util.List;
-import org.apache.ignite.network.annotations.Marshallable;
 import org.apache.ignite.network.annotations.Transferable;
 
 /**
@@ -45,6 +45,5 @@ public interface QueryBatchMessage extends 
ExecutionContextAwareMessage {
     /**
      * Get rows.
      */
-    @Marshallable
-    List<Object> rows();
+    List<ByteBuffer> rows();
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
index 87c1b2bb38..2b2670b423 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.sql.engine.exec.rel;
 
 import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
+import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
 import java.util.Deque;
 import java.util.Iterator;
@@ -47,6 +48,7 @@ import 
org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.internal.thread.LogUncaughtExceptionHandler;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
+import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.util.Pair;
 import org.apache.ignite.network.ClusterNodeImpl;
 import org.apache.ignite.network.NetworkAddress;
@@ -343,6 +345,11 @@ public abstract class AbstractExecutionTest extends 
IgniteAbstractTest {
             public Object[] create(Object... fields) {
                 return fields;
             }
+
+            @Override
+            public Object[] create(ByteBuffer raw) {
+                return ByteUtils.fromBytes(raw.array());
+            }
         };
     }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
index 07f230d083..dfbfa5704b 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
@@ -480,7 +480,8 @@ public class ExchangeExecutionTest extends 
AbstractExecutionTest {
                 createExchangeService(taskExecutor, 
serviceFactory.forNode(localNode.name()), mailboxRegistry));
 
         Inbox<Object[]> inbox = new Inbox<>(
-                targetCtx, exchangeService, mailboxRegistry, sourceNodeNames, 
comparator, SOURCE_FRAGMENT_ID, SOURCE_FRAGMENT_ID
+                targetCtx, exchangeService, mailboxRegistry, sourceNodeNames, 
comparator, rowFactory(),
+                SOURCE_FRAGMENT_ID, SOURCE_FRAGMENT_ID
         );
 
         mailboxRegistry.register(inbox);
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java
index ca9bda281b..e5316694eb 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java
@@ -26,23 +26,27 @@ import static org.apache.calcite.rel.core.JoinRelType.LEFT;
 import static org.apache.calcite.rel.core.JoinRelType.RIGHT;
 import static org.apache.calcite.rel.core.JoinRelType.SEMI;
 import static 
org.apache.ignite.internal.sql.engine.util.Commons.getFieldFromBiRows;
+import static 
org.apache.ignite.internal.sql.engine.util.TypeUtils.rowSchemaFromRelTypes;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
 import static org.apache.ignite.internal.util.ArrayUtils.asList;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
 import java.util.stream.Stream;
+import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
 import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
 import org.apache.ignite.internal.sql.engine.util.Commons;
@@ -119,6 +123,29 @@ public class ExecutionTest extends AbstractExecutionTest {
         assertArrayEquals(new Object[]{2, "Ivan", "Ignite"}, rows.get(1));
     }
 
+    @Test
+    public void testRowFactoryAssembly() {
+        ExecutionContext<Object[]> ctx = executionContext(false);
+
+        RelDataType rowType = TypeUtils.createRowType(ctx.getTypeFactory(), 
int.class, String.class, boolean.class);
+
+        RowSchema rowSchema = 
rowSchemaFromRelTypes(RelOptUtil.getFieldTypeList(rowType));
+
+        RowFactory<Object[]> rowFactory = ctx.rowHandler().factory(rowSchema);
+
+        Object[] row1 = rowFactory.create();
+
+        ctx.rowHandler().set(0, row1, 1);
+        ctx.rowHandler().set(1, row1, "2");
+        ctx.rowHandler().set(2, row1, false);
+
+        ByteBuffer bb = ctx.rowHandler().toByteBuffer(row1);
+
+        Object[] row2 = rowFactory.create(bb);
+
+        assertArrayEquals(row1, row2);
+    }
+
     @Test
     public void testUnionAll() {
         ExecutionContext<Object[]> ctx = executionContext(true);

Reply via email to