This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new bdd78f789e5 IGNITE-23033 .NET: Support tuples with schemas in Compute 
and Streamer (#5793)
bdd78f789e5 is described below

commit bdd78f789e5cada95b7e867b945f2ac49f3dbf33
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Fri May 9 13:06:55 2025 +0300

    IGNITE-23033 .NET: Support tuples with schemas in Compute and Streamer 
(#5793)
    
    Allow `IIgniteTuple` (including nested tuples) as:
    * Compute: args, results
    * Streamer with receiver: payload, args, results
---
 .../Apache.Ignite.Tests/Compute/ComputeTests.cs    |  38 +++++++
 .../Compute/PlatformComputeTests.cs                |  92 ++++++++++------
 .../Apache.Ignite.Tests/Compute/TestCases.cs       |  92 ++++++++++++++++
 .../Apache.Ignite.Tests/Table/DataStreamerTests.cs |  87 ++++++++++-----
 .../Internal/Compute/ComputePacker.cs              |  11 +-
 .../Proto/BinaryTuple/BinaryTupleBuilder.cs        |  92 ++++++++++++++++
 .../Internal/Proto/MsgPack/MsgPackWriter.cs        |   2 -
 .../Internal/Table/DataStreamerWithReceiver.cs     |  40 ++++++-
 .../Serialization/TupleWithSchemaMarshalling.cs    | 120 +++++++++++++++++++++
 9 files changed, 511 insertions(+), 63 deletions(-)

diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
index 387f38b40b9..7c6a97d1755 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
@@ -895,6 +895,44 @@ namespace Apache.Ignite.Tests.Compute
             Assert.IsNull(nullRes);
         }
 
+        [Test]
+        public async Task TestTupleWithSchemaRoundTrip()
+        {
+            var tuple = TestCases.GetTupleWithAllFieldTypes(x => x is not 
decimal);
+            tuple["nested_tuple"] = TestCases.GetTupleWithAllFieldTypes(x => x 
is not decimal);
+
+            var nodes = JobTarget.AnyNode(await Client.GetClusterNodesAsync());
+            IJobExecution<object> resExec = await 
Client.Compute.SubmitAsync(nodes, EchoJob, tuple);
+            var res = await resExec.GetResultAsync();
+
+            Assert.AreEqual(tuple, res);
+        }
+
+        [Test]
+        public async Task TestDeepNestedTupleWithSchemaRoundTrip()
+        {
+            var tuple = TestCases.GetNestedTuple(100);
+
+            var nodes = JobTarget.AnyNode(await Client.GetClusterNodesAsync());
+            IJobExecution<object> resExec = await 
Client.Compute.SubmitAsync(nodes, EchoJob, tuple);
+            var res = await resExec.GetResultAsync();
+
+            Assert.AreEqual(tuple, res);
+            StringAssert.Contains("CHILD99 = IgniteTuple { ID = 99, CHILD100 = 
IgniteTuple { ID = 100 } } } } } } } }", res.ToString());
+        }
+
+        [Test]
+        public async Task TestDeepNestedTupleWithSchemaToString()
+        {
+            var tuple = TestCases.GetNestedTuple(100);
+
+            var nodes = JobTarget.AnyNode(await Client.GetClusterNodesAsync());
+            IJobExecution<string> resExec = await 
Client.Compute.SubmitAsync(nodes, ToStringJob, tuple);
+            var res = await resExec.GetResultAsync();
+
+            StringAssert.Contains("[ID=97, CHILD98=TupleImpl [ID=98, 
CHILD99=TupleImpl [ID=99, CHILD100=TupleImpl [ID=100]]]]]]]]", res);
+        }
+
         private static async Task AssertJobStatus<T>(IJobExecution<T> 
jobExecution, JobStatus status, Instant beforeStart)
         {
             JobState? state = await jobExecution.GetStateAsync();
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/PlatformComputeTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/PlatformComputeTests.cs
index cc15099847a..45246635059 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/PlatformComputeTests.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/PlatformComputeTests.cs
@@ -24,13 +24,22 @@ using System.Linq;
 using System.Threading.Tasks;
 using Ignite.Compute;
 using Ignite.Marshalling;
+using Ignite.Table;
 using Network;
-using NodaTime;
 using NUnit.Framework;
 using TestHelpers;
 
 /// <summary>
 /// Tests for platform compute (non-Java jobs).
+/// <para />
+/// Development:
+/// - Changing test code, job code: no need to restart Ignite.
+/// - Changing core code: restart Ignite servers and do a full .NET solution 
rebuild to reflect the changes in .NET compute executor.
+/// <para />
+/// Debugging:
+/// - Run tests once so that .NET executor processes are started.
+/// - Attach to the executor processes.
+/// - Run tests again to debug the executor.
 /// </summary>
 public class PlatformComputeTests : IgniteTestsBase
 {
@@ -64,7 +73,28 @@ public class PlatformComputeTests : IgniteTestsBase
     }
 
     [Test]
-    [TestCaseSource(nameof(ArgTypesTestCases))]
+    public async Task TestBroadcastJob()
+    {
+        var jobDesc = DotNetJobs.Echo with { DeploymentUnits = 
[_defaultTestUnit] };
+        var jobTarget = BroadcastJobTarget.Nodes(
+            await GetClusterNodeAsync(),
+            await GetClusterNodeAsync("_2"),
+            await GetClusterNodeAsync("_3"));
+
+        var jobExec = await Client.Compute.SubmitBroadcastAsync(
+            jobTarget,
+            jobDesc,
+            "Hello world!");
+
+        foreach (var job in jobExec.JobExecutions)
+        {
+            var res = await job.GetResultAsync();
+            Assert.AreEqual("Hello world!", res);
+        }
+    }
+
+    [Test]
+    [TestCaseSource(typeof(TestCases), nameof(TestCases.SupportedArgs))]
     public async Task TestAllSupportedArgTypes(object val)
     {
         var result = await ExecJobAsync(DotNetJobs.Echo, val);
@@ -216,6 +246,35 @@ public class PlatformComputeTests : IgniteTestsBase
         Assert.AreEqual(2, assemblyLoadContextCount);
     }
 
+    [Test]
+    public async Task TestTupleWithSchemaRoundTrip()
+    {
+        var tuple = TestCases.GetTupleWithAllFieldTypes();
+        tuple["nested_tuple"] = TestCases.GetTupleWithAllFieldTypes(x => x is 
not decimal);
+
+        var expectedTuple = Enumerable.Range(0, tuple.FieldCount).Aggregate(
+            seed: new IgniteTuple(),
+            (acc, i) =>
+            {
+                acc[tuple.GetName(i)] = tuple[i] is decimal d ? new 
BigDecimal(d) : tuple[i];
+                return acc;
+            });
+
+        var res = (IIgniteTuple)(await ExecJobAsync(DotNetJobs.Echo, tuple))!;
+
+        Assert.AreEqual(expectedTuple, res);
+    }
+
+    [Test]
+    public async Task TestDeepNestedTupleWithSchemaRoundTrip()
+    {
+        var tuple = TestCases.GetNestedTuple(100);
+        var res = await ExecJobAsync(DotNetJobs.Echo, tuple);
+
+        Assert.AreEqual(tuple, res);
+        StringAssert.Contains("CHILD99 = IgniteTuple { ID = 99, CHILD100 = 
IgniteTuple { ID = 100 } } } } } } } }", res?.ToString());
+    }
+
     private static async Task<DeploymentUnit> DeployTestsAssembly(string? 
unitId = null, string? unitVersion = null)
     {
         var testsDll = typeof(PlatformComputeTests).Assembly.Location;
@@ -231,35 +290,6 @@ public class PlatformComputeTests : IgniteTestsBase
         return new DeploymentUnit(unitId0, unitVersion0);
     }
 
-    private static IEnumerable<object> ArgTypesTestCases() => [
-        sbyte.MinValue,
-        sbyte.MaxValue,
-        short.MinValue,
-        short.MaxValue,
-        int.MinValue,
-        int.MaxValue,
-        long.MinValue,
-        long.MaxValue,
-        float.MinValue,
-        float.MaxValue,
-        double.MinValue,
-        double.MaxValue,
-        123.456m,
-        -123.456m,
-        decimal.MinValue,
-        decimal.MaxValue,
-        new BigDecimal(long.MinValue, 10),
-        new BigDecimal(long.MaxValue, 20),
-        new byte[] { 1, 255 },
-        "Ignite 🔥",
-        LocalDate.MinIsoValue,
-        LocalTime.Noon,
-        LocalDateTime.MaxIsoValue,
-        Instant.FromUtc(2001, 3, 4, 5, 6),
-        Guid.Empty,
-        new Guid(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 
15, 16 }),
-    ];
-
     private async Task<IClusterNode> GetClusterNodeAsync(string? suffix = null)
     {
         var nodeName = ComputeTests.PlatformTestNodeRunner + suffix;
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/TestCases.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/TestCases.cs
new file mode 100644
index 00000000000..e83168ddd68
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/TestCases.cs
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Tests.Compute;
+
+using System;
+using System.Collections.Generic;
+using Ignite.Table;
+using NodaTime;
+
+/// <summary>
+/// Compute test cases.
+/// </summary>
+public static class TestCases
+{
+    public static List<object> SupportedArgs => [
+        sbyte.MinValue,
+        sbyte.MaxValue,
+        short.MinValue,
+        short.MaxValue,
+        int.MinValue,
+        int.MaxValue,
+        long.MinValue,
+        long.MaxValue,
+        float.MinValue,
+        float.MaxValue,
+        double.MinValue,
+        double.MaxValue,
+        123.456m,
+        -123.456m,
+        decimal.MinValue,
+        decimal.MaxValue,
+        new BigDecimal(long.MinValue, 10),
+        new BigDecimal(long.MaxValue, 20),
+        new byte[] { 1, 255 },
+        "Ignite 🔥",
+        LocalDate.MinIsoValue,
+        LocalTime.Noon,
+        LocalDateTime.MaxIsoValue,
+        Instant.FromUtc(2001, 3, 4, 5, 6),
+        Guid.Empty,
+        new Guid(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 
15, 16 }),
+    ];
+
+    public static IIgniteTuple GetTupleWithAllFieldTypes(Func<object, bool>? 
filter = null)
+    {
+        var res = new IgniteTuple();
+
+        for (var i = 0; i < SupportedArgs.Count; i++)
+        {
+            var val = SupportedArgs[i];
+
+            if (filter != null && !filter(val))
+            {
+                continue;
+            }
+
+            res[$"v{i}"] = val;
+        }
+
+        return res;
+    }
+
+    public static IIgniteTuple GetNestedTuple(int depth)
+    {
+        var res = new IgniteTuple { ["id"] = "root" };
+        var current = res;
+
+        for (var i = 1; i <= depth; i++)
+        {
+            var nested = new IgniteTuple { ["id"] = i };
+            current[$"child{i}"] = nested;
+            current = nested;
+        }
+
+        return res;
+    }
+}
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
index c343a8efafd..a256558e498 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
@@ -40,6 +40,8 @@ public class DataStreamerTests : IgniteTestsBase
 {
     private const string TestReceiverClassName = 
ComputeTests.PlatformTestNodeRunner + "$TestReceiver";
 
+    private const string EchoReceiverClassName = 
ComputeTests.PlatformTestNodeRunner + "$EchoReceiver";
+
     private const string EchoArgsReceiverClassName = 
ComputeTests.PlatformTestNodeRunner + "$EchoArgsReceiver";
 
     private const string UpsertElementTypeNameReceiverClassName = 
ComputeTests.PlatformTestNodeRunner + "$UpsertElementTypeNameReceiver";
@@ -54,29 +56,9 @@ public class DataStreamerTests : IgniteTestsBase
 
     private static readonly ReceiverDescriptor<object?> TestReceiverNoResults 
= new(TestReceiverClassName);
 
-    private static readonly ReceiverDescriptor<object, object> 
EchoArgsReceiver = new(EchoArgsReceiverClassName);
+    private static readonly ReceiverDescriptor<object?, object> EchoReceiver = 
new(EchoReceiverClassName);
 
-    private static readonly object[] AllSupportedTypes =
-    {
-        true,
-        sbyte.MaxValue,
-        short.MinValue,
-        int.MaxValue,
-        long.MinValue,
-        float.MaxValue,
-        double.MinValue,
-        decimal.One,
-        new BigDecimal(1234, 2),
-        new LocalDate(1234, 5, 6),
-        new LocalTime(12, 3, 4, 567),
-        new LocalDateTime(1234, 5, 6, 7, 8, 9),
-        Instant.FromUnixTimeSeconds(123456),
-        Guid.Empty,
-        "str123",
-        new byte[] { 1, 2, 3 },
-        Period.FromDays(999),
-        Duration.FromSeconds(12345),
-    };
+    private static readonly ReceiverDescriptor<object, object> 
EchoArgsReceiver = new(EchoArgsReceiverClassName);
 
     private static int _unknownKey = 333000;
 
@@ -805,8 +787,26 @@ public class DataStreamerTests : IgniteTestsBase
             ex.Message);
     }
 
-    [TestCaseSource(nameof(AllSupportedTypes))]
-    public async Task TestEchoReceiverAllDataTypes(object arg)
+    [TestCaseSource(typeof(TestCases), nameof(TestCases.SupportedArgs))]
+    public async Task TestEchoReceiverAllDataTypes(object payload)
+    {
+        var res = await PocoView.StreamDataAsync<object, object, object?, 
object>(
+            new[] { payload }.ToAsyncEnumerable(),
+            keySelector: _ => new Poco(),
+            payloadSelector: x => x,
+            EchoReceiver,
+            receiverArg: null).SingleAsync();
+
+        if (payload is decimal dec)
+        {
+            payload = new BigDecimal(dec);
+        }
+
+        Assert.AreEqual(payload, res);
+    }
+
+    [TestCaseSource(typeof(TestCases), nameof(TestCases.SupportedArgs))]
+    public async Task TestEchoArgsReceiverAllDataTypes(object arg)
     {
         var res = await PocoView.StreamDataAsync<object, object, object, 
object>(
             new object[] { 1 }.ToAsyncEnumerable(),
@@ -823,6 +823,45 @@ public class DataStreamerTests : IgniteTestsBase
         Assert.AreEqual(arg, res);
     }
 
+    [Test]
+    public async Task TestEchoReceiverTuple()
+    {
+        var count = 5_000;
+
+        var payload = TestCases.GetTupleWithAllFieldTypes(x => x is not 
decimal);
+        payload["nested"] = new IgniteTuple { ["foo"] = "bar" };
+
+        List<object> res = await PocoView.StreamDataAsync<object, object, 
object?, object>(
+            Enumerable.Range(1, count).Select(x => 
payload).ToAsyncEnumerable(),
+            keySelector: _ => new Poco(),
+            payloadSelector: x => x,
+            EchoReceiver,
+            receiverArg: null).ToListAsync();
+
+        Assert.AreEqual(count, res.Count);
+
+        for (int i = 0; i < count; i++)
+        {
+            Assert.AreEqual(payload, res[i]);
+        }
+    }
+
+    [Test]
+    public async Task TestEchoArgsReceiverTuple()
+    {
+        var arg = TestCases.GetTupleWithAllFieldTypes(x => x is not decimal);
+        arg["nested"] = new IgniteTuple { ["foo"] = "bar" };
+
+        var res = await PocoView.StreamDataAsync<object, object, object, 
object>(
+            new object[] { 1 }.ToAsyncEnumerable(),
+            keySelector: _ => new Poco(),
+            payloadSelector: x => x.ToString()!,
+            EchoArgsReceiver,
+            receiverArg: arg).SingleAsync();
+
+        Assert.AreEqual(arg, res);
+    }
+
     [Test]
     public async Task TestResultConsumerEarlyExit()
     {
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/ComputePacker.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/ComputePacker.cs
index 165d05f8bad..0198924c163 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/ComputePacker.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/ComputePacker.cs
@@ -22,6 +22,7 @@ using System.Buffers;
 using Ignite.Table;
 using Marshalling;
 using Proto.MsgPack;
+using Table.Serialization;
 
 /// <summary>
 /// Compute packer utils.
@@ -68,13 +69,14 @@ internal static class ComputePacker
             return;
         }
 
-        if (obj is IIgniteTuple)
+        if (obj is IIgniteTuple tuple)
         {
-            // TODO: IGNITE-23033 .NET: Thin 3.0: Support tuples with schemas 
in Compute
             w.Write(Tuple);
-            throw new NotImplementedException("IGNITE-23033");
+            w.Write(static (bufWriter, arg) => 
TupleWithSchemaMarshalling.Pack(bufWriter, arg), tuple);
+            return;
         }
 
+        // TODO IGNITE-25337 Automatic POCO serialization.
         w.Write(Native);
         w.WriteObjectAsBinaryTuple(obj);
     }
@@ -95,9 +97,10 @@ internal static class ComputePacker
 
         int type = r.ReadInt32();
 
+        // TODO IGNITE-25337 Automatic POCO serialization.
         return type switch
         {
-            Tuple => throw new NotImplementedException("IGNITE-23033"),
+            Tuple => 
(T)(object)TupleWithSchemaMarshalling.Unpack(r.ReadBinary()),
             MarshallerObject => Unmarshal(ref r, marshaller),
             _ => (T)r.ReadObjectFromBinaryTuple()!
         };
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleBuilder.cs
 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleBuilder.cs
index 986d1d9ec86..5ec5f7316b8 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleBuilder.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleBuilder.cs
@@ -998,6 +998,98 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
             }
         }
 
+        /// <summary>
+        /// Appends an object without type code and returns type.
+        /// </summary>
+        /// <param name="value">Value.</param>
+        /// <returns>Resulting column type.</returns>
+        public ColumnType AppendObjectAndGetType(
+            object? value)
+        {
+            switch (value)
+            {
+                case null:
+                    AppendNull(); // Value.
+                    return ColumnType.Null;
+
+                case bool b:
+                    AppendBool(b);
+                    return ColumnType.Boolean;
+
+                case int i32:
+                    AppendInt(i32);
+                    return ColumnType.Int32;
+
+                case long i64:
+                    AppendLong(i64);
+                    return ColumnType.Int64;
+
+                case string str:
+                    AppendString(str);
+                    return ColumnType.String;
+
+                case Guid uuid:
+                    AppendGuid(uuid);
+                    return ColumnType.Uuid;
+
+                case sbyte i8:
+                    AppendByte(i8);
+                    return ColumnType.Int8;
+
+                case short i16:
+                    AppendShort(i16);
+                    return ColumnType.Int16;
+
+                case float f32:
+                    AppendFloat(f32);
+                    return ColumnType.Float;
+
+                case double f64:
+                    AppendDouble(f64);
+                    return ColumnType.Double;
+
+                case byte[] bytes:
+                    AppendBytes(bytes);
+                    return ColumnType.ByteArray;
+
+                case decimal dec:
+                    var bigDec0 = new BigDecimal(dec);
+                    AppendBigDecimal(bigDec0, bigDec0.Scale);
+                    return ColumnType.Decimal;
+
+                case BigDecimal bigDec:
+                    AppendBigDecimal(bigDec, bigDec.Scale);
+                    return ColumnType.Decimal;
+
+                case LocalDate localDate:
+                    AppendDate(localDate);
+                    return ColumnType.Date;
+
+                case LocalTime localTime:
+                    AppendTime(localTime, TemporalTypes.MaxTimePrecision);
+                    return ColumnType.Time;
+
+                case LocalDateTime localDateTime:
+                    AppendDateTime(localDateTime, 
TemporalTypes.MaxTimePrecision);
+                    return ColumnType.Datetime;
+
+                case Instant instant:
+                    AppendTimestamp(instant, TemporalTypes.MaxTimePrecision);
+                    return ColumnType.Timestamp;
+
+                case Period period:
+                    AppendPeriod(period);
+                    return ColumnType.Period;
+
+                case Duration duration:
+                    AppendDuration(duration);
+                    return ColumnType.Duration;
+
+                default:
+                    throw new 
IgniteClientException(ErrorGroups.Client.Protocol, "Unsupported type: " + 
value.GetType());
+            }
+        }
+
         /// <summary>
         /// Appends an object.
         /// </summary>
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackWriter.cs
 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackWriter.cs
index 36c1ff5a8f6..635d083aab8 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackWriter.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackWriter.cs
@@ -23,8 +23,6 @@ using System.Buffers.Binary;
 using System.Collections.Generic;
 using BinaryTuple;
 using Buffers;
-using Ignite.Sql;
-using Marshalling;
 
 /// <summary>
 /// MsgPack writer. Wraps <see cref="PooledArrayBuffer"/>. Writer index is 
kept by the buffer, so this struct is readonly.
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamerWithReceiver.cs
 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamerWithReceiver.cs
index 9537085964c..9905fc75c22 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamerWithReceiver.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamerWithReceiver.cs
@@ -408,9 +408,32 @@ internal static class DataStreamerWithReceiver
         using var builder = new BinaryTupleBuilder(binaryTupleSize);
 
         builder.AppendString(className);
-        builder.AppendObjectWithType(arg);
 
-        builder.AppendObjectCollectionWithType(items);
+        if (arg is IIgniteTuple tupleArg)
+        {
+            builder.AppendInt(TupleWithSchemaMarshalling.TypeIdTuple);
+            builder.AppendInt(0); // Scale.
+            builder.AppendBytes(static (bufWriter, arg) => 
TupleWithSchemaMarshalling.Pack(bufWriter, arg), tupleArg);
+        }
+        else
+        {
+            builder.AppendObjectWithType(arg);
+        }
+
+        if (items[0] is IIgniteTuple)
+        {
+            builder.AppendInt(TupleWithSchemaMarshalling.TypeIdTuple);
+            builder.AppendInt(items.Length);
+
+            foreach (var item in items)
+            {
+                builder.AppendBytes(static (bufWriter, arg) => 
TupleWithSchemaMarshalling.Pack(bufWriter, (IIgniteTuple)arg!), item);
+            }
+        }
+        else
+        {
+            builder.AppendObjectCollectionWithType(items);
+        }
 
         w.Write(binaryTupleSize);
         w.Write(builder.Build().Span);
@@ -457,6 +480,19 @@ internal static class DataStreamerWithReceiver
 
             var tuple = new BinaryTupleReader(reader.ReadBinary(), 
numElements);
 
+            if (tuple.GetInt(0) == TupleWithSchemaMarshalling.TypeIdTuple)
+            {
+                int elementCount = tuple.GetInt(1);
+                T[] resultsPooledArr = ArrayPool<T>.Shared.Rent(elementCount);
+
+                for (var i = 0; i < elementCount; i++)
+                {
+                    resultsPooledArr[i] = 
(T)(object)TupleWithSchemaMarshalling.Unpack(tuple.GetBytesSpan(2 + i));
+                }
+
+                return (resultsPooledArr, elementCount);
+            }
+
             return tuple.GetObjectCollectionWithType<T>();
         }
     }
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/TupleWithSchemaMarshalling.cs
 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/TupleWithSchemaMarshalling.cs
new file mode 100644
index 00000000000..14bb0745fe8
--- /dev/null
+++ 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/TupleWithSchemaMarshalling.cs
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Internal.Table.Serialization;
+
+using System;
+using System.Buffers;
+using System.Buffers.Binary;
+using Ignite.Sql;
+using Ignite.Table;
+using Proto.BinaryTuple;
+
+/// <summary>
+/// Tuple with schema marshalling - see also 
o.a.i.internal.binarytuple.inlineschema.TupleWithSchemaMarshalling.
+/// </summary>
+internal static class TupleWithSchemaMarshalling
+{
+    /// <summary>
+    /// Type id for <see cref="IIgniteTuple"/>.
+    /// </summary>
+    public const int TypeIdTuple = -1;
+
+    /// <summary>
+    /// Packs tuple with schema.
+    /// </summary>
+    /// <param name="w">Packer.</param>
+    /// <param name="tuple">Tuple.</param>
+    public static void Pack(IBufferWriter<byte> w, IIgniteTuple tuple)
+    {
+        int elementCount = tuple.FieldCount;
+
+        using var schemaBuilder = new BinaryTupleBuilder(elementCount * 2);
+        using var valueBuilder = new BinaryTupleBuilder(elementCount);
+
+        for (int i = 0; i < elementCount; i++)
+        {
+            string fieldName = tuple.GetName(i);
+            object? fieldValue = tuple[i];
+
+            schemaBuilder.AppendString(fieldName);
+
+            if (fieldValue is IIgniteTuple nestedTuple)
+            {
+                valueBuilder.AppendBytes(static (bufWriter, arg) => 
Pack(bufWriter, arg), nestedTuple);
+                schemaBuilder.AppendInt(TypeIdTuple);
+            }
+            else
+            {
+                ColumnType typeId = 
valueBuilder.AppendObjectAndGetType(fieldValue);
+                schemaBuilder.AppendInt((int)typeId);
+            }
+        }
+
+        Memory<byte> schemaMem = schemaBuilder.Build();
+        Memory<byte> valueMem = valueBuilder.Build();
+
+        // Size: int32 (tuple size), int32 (value offset), schema, value.
+        var schemaOffset = 8;
+        var valueOffset = schemaOffset + schemaMem.Length;
+        var totalSize = valueOffset + valueMem.Length;
+
+        Span<byte> targetSpan = w.GetSpan(totalSize);
+        w.Advance(totalSize);
+
+        BinaryPrimitives.WriteInt32LittleEndian(targetSpan, elementCount);
+        BinaryPrimitives.WriteInt32LittleEndian(targetSpan[4..], valueOffset);
+        schemaMem.Span.CopyTo(targetSpan[schemaOffset..]);
+        valueMem.Span.CopyTo(targetSpan[valueOffset..]);
+    }
+
+    /// <summary>
+    /// Unpacks tuple with schema.
+    /// </summary>
+    /// <param name="span">Bytes.</param>
+    /// <returns>Tuple.</returns>
+    public static IgniteTuple Unpack(ReadOnlySpan<byte> span)
+    {
+        int elementCount = BinaryPrimitives.ReadInt32LittleEndian(span);
+        int valueOffset = BinaryPrimitives.ReadInt32LittleEndian(span[4..]);
+
+        ReadOnlySpan<byte> schemaBytes = span[8..valueOffset];
+        ReadOnlySpan<byte> valueBytes = span[valueOffset..];
+
+        var res = new IgniteTuple(elementCount);
+
+        var schemaReader = new BinaryTupleReader(schemaBytes, elementCount * 
2);
+        var valueReader = new BinaryTupleReader(valueBytes, elementCount);
+
+        for (int i = 0; i < elementCount; i++)
+        {
+            string fieldName = schemaReader.GetString(i * 2);
+            int fieldTypeId = schemaReader.GetInt(i * 2 + 1);
+
+            if (fieldTypeId == TypeIdTuple)
+            {
+                res[fieldName] = Unpack(valueReader.GetBytesSpan(i));
+            }
+            else
+            {
+                res[fieldName] = valueReader.GetObject(i, 
(ColumnType)fieldTypeId);
+            }
+        }
+
+        return res;
+    }
+}

Reply via email to