This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new bdd78f789e5 IGNITE-23033 .NET: Support tuples with schemas in Compute
and Streamer (#5793)
bdd78f789e5 is described below
commit bdd78f789e5cada95b7e867b945f2ac49f3dbf33
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Fri May 9 13:06:55 2025 +0300
IGNITE-23033 .NET: Support tuples with schemas in Compute and Streamer
(#5793)
Allow `IIgniteTuple` (including nested tuples) as:
* Compute: args, results
* Streamer with receiver: payload, args, results
---
.../Apache.Ignite.Tests/Compute/ComputeTests.cs | 38 +++++++
.../Compute/PlatformComputeTests.cs | 92 ++++++++++------
.../Apache.Ignite.Tests/Compute/TestCases.cs | 92 ++++++++++++++++
.../Apache.Ignite.Tests/Table/DataStreamerTests.cs | 87 ++++++++++-----
.../Internal/Compute/ComputePacker.cs | 11 +-
.../Proto/BinaryTuple/BinaryTupleBuilder.cs | 92 ++++++++++++++++
.../Internal/Proto/MsgPack/MsgPackWriter.cs | 2 -
.../Internal/Table/DataStreamerWithReceiver.cs | 40 ++++++-
.../Serialization/TupleWithSchemaMarshalling.cs | 120 +++++++++++++++++++++
9 files changed, 511 insertions(+), 63 deletions(-)
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
index 387f38b40b9..7c6a97d1755 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
@@ -895,6 +895,44 @@ namespace Apache.Ignite.Tests.Compute
Assert.IsNull(nullRes);
}
+ [Test]
+ public async Task TestTupleWithSchemaRoundTrip()
+ {
+ var tuple = TestCases.GetTupleWithAllFieldTypes(x => x is not
decimal);
+ tuple["nested_tuple"] = TestCases.GetTupleWithAllFieldTypes(x => x
is not decimal);
+
+ var nodes = JobTarget.AnyNode(await Client.GetClusterNodesAsync());
+ IJobExecution<object> resExec = await
Client.Compute.SubmitAsync(nodes, EchoJob, tuple);
+ var res = await resExec.GetResultAsync();
+
+ Assert.AreEqual(tuple, res);
+ }
+
+ [Test]
+ public async Task TestDeepNestedTupleWithSchemaRoundTrip()
+ {
+ var tuple = TestCases.GetNestedTuple(100);
+
+ var nodes = JobTarget.AnyNode(await Client.GetClusterNodesAsync());
+ IJobExecution<object> resExec = await
Client.Compute.SubmitAsync(nodes, EchoJob, tuple);
+ var res = await resExec.GetResultAsync();
+
+ Assert.AreEqual(tuple, res);
+ StringAssert.Contains("CHILD99 = IgniteTuple { ID = 99, CHILD100 =
IgniteTuple { ID = 100 } } } } } } } }", res.ToString());
+ }
+
+ [Test]
+ public async Task TestDeepNestedTupleWithSchemaToString()
+ {
+ var tuple = TestCases.GetNestedTuple(100);
+
+ var nodes = JobTarget.AnyNode(await Client.GetClusterNodesAsync());
+ IJobExecution<string> resExec = await
Client.Compute.SubmitAsync(nodes, ToStringJob, tuple);
+ var res = await resExec.GetResultAsync();
+
+ StringAssert.Contains("[ID=97, CHILD98=TupleImpl [ID=98,
CHILD99=TupleImpl [ID=99, CHILD100=TupleImpl [ID=100]]]]]]]]", res);
+ }
+
private static async Task AssertJobStatus<T>(IJobExecution<T>
jobExecution, JobStatus status, Instant beforeStart)
{
JobState? state = await jobExecution.GetStateAsync();
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/PlatformComputeTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/PlatformComputeTests.cs
index cc15099847a..45246635059 100644
---
a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/PlatformComputeTests.cs
+++
b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/PlatformComputeTests.cs
@@ -24,13 +24,22 @@ using System.Linq;
using System.Threading.Tasks;
using Ignite.Compute;
using Ignite.Marshalling;
+using Ignite.Table;
using Network;
-using NodaTime;
using NUnit.Framework;
using TestHelpers;
/// <summary>
/// Tests for platform compute (non-Java jobs).
+/// <para />
+/// Development:
+/// - Changing test code, job code: no need to restart Ignite.
+/// - Changing core code: restart Ignite servers and do a full .NET solution
rebuild to reflect the changes in .NET compute executor.
+/// <para />
+/// Debugging:
+/// - Run tests once so that .NET executor processes are started.
+/// - Attach to the executor processes.
+/// - Run tests again to debug the executor.
/// </summary>
public class PlatformComputeTests : IgniteTestsBase
{
@@ -64,7 +73,28 @@ public class PlatformComputeTests : IgniteTestsBase
}
[Test]
- [TestCaseSource(nameof(ArgTypesTestCases))]
+ public async Task TestBroadcastJob()
+ {
+ var jobDesc = DotNetJobs.Echo with { DeploymentUnits =
[_defaultTestUnit] };
+ var jobTarget = BroadcastJobTarget.Nodes(
+ await GetClusterNodeAsync(),
+ await GetClusterNodeAsync("_2"),
+ await GetClusterNodeAsync("_3"));
+
+ var jobExec = await Client.Compute.SubmitBroadcastAsync(
+ jobTarget,
+ jobDesc,
+ "Hello world!");
+
+ foreach (var job in jobExec.JobExecutions)
+ {
+ var res = await job.GetResultAsync();
+ Assert.AreEqual("Hello world!", res);
+ }
+ }
+
+ [Test]
+ [TestCaseSource(typeof(TestCases), nameof(TestCases.SupportedArgs))]
public async Task TestAllSupportedArgTypes(object val)
{
var result = await ExecJobAsync(DotNetJobs.Echo, val);
@@ -216,6 +246,35 @@ public class PlatformComputeTests : IgniteTestsBase
Assert.AreEqual(2, assemblyLoadContextCount);
}
+ [Test]
+ public async Task TestTupleWithSchemaRoundTrip()
+ {
+ var tuple = TestCases.GetTupleWithAllFieldTypes();
+ tuple["nested_tuple"] = TestCases.GetTupleWithAllFieldTypes(x => x is
not decimal);
+
+ var expectedTuple = Enumerable.Range(0, tuple.FieldCount).Aggregate(
+ seed: new IgniteTuple(),
+ (acc, i) =>
+ {
+ acc[tuple.GetName(i)] = tuple[i] is decimal d ? new
BigDecimal(d) : tuple[i];
+ return acc;
+ });
+
+ var res = (IIgniteTuple)(await ExecJobAsync(DotNetJobs.Echo, tuple))!;
+
+ Assert.AreEqual(expectedTuple, res);
+ }
+
+ [Test]
+ public async Task TestDeepNestedTupleWithSchemaRoundTrip()
+ {
+ var tuple = TestCases.GetNestedTuple(100);
+ var res = await ExecJobAsync(DotNetJobs.Echo, tuple);
+
+ Assert.AreEqual(tuple, res);
+ StringAssert.Contains("CHILD99 = IgniteTuple { ID = 99, CHILD100 =
IgniteTuple { ID = 100 } } } } } } } }", res?.ToString());
+ }
+
private static async Task<DeploymentUnit> DeployTestsAssembly(string?
unitId = null, string? unitVersion = null)
{
var testsDll = typeof(PlatformComputeTests).Assembly.Location;
@@ -231,35 +290,6 @@ public class PlatformComputeTests : IgniteTestsBase
return new DeploymentUnit(unitId0, unitVersion0);
}
- private static IEnumerable<object> ArgTypesTestCases() => [
- sbyte.MinValue,
- sbyte.MaxValue,
- short.MinValue,
- short.MaxValue,
- int.MinValue,
- int.MaxValue,
- long.MinValue,
- long.MaxValue,
- float.MinValue,
- float.MaxValue,
- double.MinValue,
- double.MaxValue,
- 123.456m,
- -123.456m,
- decimal.MinValue,
- decimal.MaxValue,
- new BigDecimal(long.MinValue, 10),
- new BigDecimal(long.MaxValue, 20),
- new byte[] { 1, 255 },
- "Ignite 🔥",
- LocalDate.MinIsoValue,
- LocalTime.Noon,
- LocalDateTime.MaxIsoValue,
- Instant.FromUtc(2001, 3, 4, 5, 6),
- Guid.Empty,
- new Guid(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14,
15, 16 }),
- ];
-
private async Task<IClusterNode> GetClusterNodeAsync(string? suffix = null)
{
var nodeName = ComputeTests.PlatformTestNodeRunner + suffix;
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/TestCases.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/TestCases.cs
new file mode 100644
index 00000000000..e83168ddd68
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/TestCases.cs
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Tests.Compute;
+
+using System;
+using System.Collections.Generic;
+using Ignite.Table;
+using NodaTime;
+
+/// <summary>
+/// Compute test cases.
+/// </summary>
+public static class TestCases
+{
+ public static List<object> SupportedArgs => [
+ sbyte.MinValue,
+ sbyte.MaxValue,
+ short.MinValue,
+ short.MaxValue,
+ int.MinValue,
+ int.MaxValue,
+ long.MinValue,
+ long.MaxValue,
+ float.MinValue,
+ float.MaxValue,
+ double.MinValue,
+ double.MaxValue,
+ 123.456m,
+ -123.456m,
+ decimal.MinValue,
+ decimal.MaxValue,
+ new BigDecimal(long.MinValue, 10),
+ new BigDecimal(long.MaxValue, 20),
+ new byte[] { 1, 255 },
+ "Ignite 🔥",
+ LocalDate.MinIsoValue,
+ LocalTime.Noon,
+ LocalDateTime.MaxIsoValue,
+ Instant.FromUtc(2001, 3, 4, 5, 6),
+ Guid.Empty,
+ new Guid(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14,
15, 16 }),
+ ];
+
+ public static IIgniteTuple GetTupleWithAllFieldTypes(Func<object, bool>?
filter = null)
+ {
+ var res = new IgniteTuple();
+
+ for (var i = 0; i < SupportedArgs.Count; i++)
+ {
+ var val = SupportedArgs[i];
+
+ if (filter != null && !filter(val))
+ {
+ continue;
+ }
+
+ res[$"v{i}"] = val;
+ }
+
+ return res;
+ }
+
+ public static IIgniteTuple GetNestedTuple(int depth)
+ {
+ var res = new IgniteTuple { ["id"] = "root" };
+ var current = res;
+
+ for (var i = 1; i <= depth; i++)
+ {
+ var nested = new IgniteTuple { ["id"] = i };
+ current[$"child{i}"] = nested;
+ current = nested;
+ }
+
+ return res;
+ }
+}
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
index c343a8efafd..a256558e498 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
@@ -40,6 +40,8 @@ public class DataStreamerTests : IgniteTestsBase
{
private const string TestReceiverClassName =
ComputeTests.PlatformTestNodeRunner + "$TestReceiver";
+ private const string EchoReceiverClassName =
ComputeTests.PlatformTestNodeRunner + "$EchoReceiver";
+
private const string EchoArgsReceiverClassName =
ComputeTests.PlatformTestNodeRunner + "$EchoArgsReceiver";
private const string UpsertElementTypeNameReceiverClassName =
ComputeTests.PlatformTestNodeRunner + "$UpsertElementTypeNameReceiver";
@@ -54,29 +56,9 @@ public class DataStreamerTests : IgniteTestsBase
private static readonly ReceiverDescriptor<object?> TestReceiverNoResults
= new(TestReceiverClassName);
- private static readonly ReceiverDescriptor<object, object>
EchoArgsReceiver = new(EchoArgsReceiverClassName);
+ private static readonly ReceiverDescriptor<object?, object> EchoReceiver =
new(EchoReceiverClassName);
- private static readonly object[] AllSupportedTypes =
- {
- true,
- sbyte.MaxValue,
- short.MinValue,
- int.MaxValue,
- long.MinValue,
- float.MaxValue,
- double.MinValue,
- decimal.One,
- new BigDecimal(1234, 2),
- new LocalDate(1234, 5, 6),
- new LocalTime(12, 3, 4, 567),
- new LocalDateTime(1234, 5, 6, 7, 8, 9),
- Instant.FromUnixTimeSeconds(123456),
- Guid.Empty,
- "str123",
- new byte[] { 1, 2, 3 },
- Period.FromDays(999),
- Duration.FromSeconds(12345),
- };
+ private static readonly ReceiverDescriptor<object, object>
EchoArgsReceiver = new(EchoArgsReceiverClassName);
private static int _unknownKey = 333000;
@@ -805,8 +787,26 @@ public class DataStreamerTests : IgniteTestsBase
ex.Message);
}
- [TestCaseSource(nameof(AllSupportedTypes))]
- public async Task TestEchoReceiverAllDataTypes(object arg)
+ [TestCaseSource(typeof(TestCases), nameof(TestCases.SupportedArgs))]
+ public async Task TestEchoReceiverAllDataTypes(object payload)
+ {
+ var res = await PocoView.StreamDataAsync<object, object, object?,
object>(
+ new[] { payload }.ToAsyncEnumerable(),
+ keySelector: _ => new Poco(),
+ payloadSelector: x => x,
+ EchoReceiver,
+ receiverArg: null).SingleAsync();
+
+ if (payload is decimal dec)
+ {
+ payload = new BigDecimal(dec);
+ }
+
+ Assert.AreEqual(payload, res);
+ }
+
+ [TestCaseSource(typeof(TestCases), nameof(TestCases.SupportedArgs))]
+ public async Task TestEchoArgsReceiverAllDataTypes(object arg)
{
var res = await PocoView.StreamDataAsync<object, object, object,
object>(
new object[] { 1 }.ToAsyncEnumerable(),
@@ -823,6 +823,45 @@ public class DataStreamerTests : IgniteTestsBase
Assert.AreEqual(arg, res);
}
+ [Test]
+ public async Task TestEchoReceiverTuple()
+ {
+ var count = 5_000;
+
+ var payload = TestCases.GetTupleWithAllFieldTypes(x => x is not
decimal);
+ payload["nested"] = new IgniteTuple { ["foo"] = "bar" };
+
+ List<object> res = await PocoView.StreamDataAsync<object, object,
object?, object>(
+ Enumerable.Range(1, count).Select(x =>
payload).ToAsyncEnumerable(),
+ keySelector: _ => new Poco(),
+ payloadSelector: x => x,
+ EchoReceiver,
+ receiverArg: null).ToListAsync();
+
+ Assert.AreEqual(count, res.Count);
+
+ for (int i = 0; i < count; i++)
+ {
+ Assert.AreEqual(payload, res[i]);
+ }
+ }
+
+ [Test]
+ public async Task TestEchoArgsReceiverTuple()
+ {
+ var arg = TestCases.GetTupleWithAllFieldTypes(x => x is not decimal);
+ arg["nested"] = new IgniteTuple { ["foo"] = "bar" };
+
+ var res = await PocoView.StreamDataAsync<object, object, object,
object>(
+ new object[] { 1 }.ToAsyncEnumerable(),
+ keySelector: _ => new Poco(),
+ payloadSelector: x => x.ToString()!,
+ EchoArgsReceiver,
+ receiverArg: arg).SingleAsync();
+
+ Assert.AreEqual(arg, res);
+ }
+
[Test]
public async Task TestResultConsumerEarlyExit()
{
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/ComputePacker.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/ComputePacker.cs
index 165d05f8bad..0198924c163 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/ComputePacker.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/ComputePacker.cs
@@ -22,6 +22,7 @@ using System.Buffers;
using Ignite.Table;
using Marshalling;
using Proto.MsgPack;
+using Table.Serialization;
/// <summary>
/// Compute packer utils.
@@ -68,13 +69,14 @@ internal static class ComputePacker
return;
}
- if (obj is IIgniteTuple)
+ if (obj is IIgniteTuple tuple)
{
- // TODO: IGNITE-23033 .NET: Thin 3.0: Support tuples with schemas
in Compute
w.Write(Tuple);
- throw new NotImplementedException("IGNITE-23033");
+ w.Write(static (bufWriter, arg) =>
TupleWithSchemaMarshalling.Pack(bufWriter, arg), tuple);
+ return;
}
+ // TODO IGNITE-25337 Automatic POCO serialization.
w.Write(Native);
w.WriteObjectAsBinaryTuple(obj);
}
@@ -95,9 +97,10 @@ internal static class ComputePacker
int type = r.ReadInt32();
+ // TODO IGNITE-25337 Automatic POCO serialization.
return type switch
{
- Tuple => throw new NotImplementedException("IGNITE-23033"),
+ Tuple =>
(T)(object)TupleWithSchemaMarshalling.Unpack(r.ReadBinary()),
MarshallerObject => Unmarshal(ref r, marshaller),
_ => (T)r.ReadObjectFromBinaryTuple()!
};
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleBuilder.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleBuilder.cs
index 986d1d9ec86..5ec5f7316b8 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleBuilder.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleBuilder.cs
@@ -998,6 +998,98 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
}
}
+ /// <summary>
+ /// Appends an object without type code and returns type.
+ /// </summary>
+ /// <param name="value">Value.</param>
+ /// <returns>Resulting column type.</returns>
+ public ColumnType AppendObjectAndGetType(
+ object? value)
+ {
+ switch (value)
+ {
+ case null:
+ AppendNull(); // Value.
+ return ColumnType.Null;
+
+ case bool b:
+ AppendBool(b);
+ return ColumnType.Boolean;
+
+ case int i32:
+ AppendInt(i32);
+ return ColumnType.Int32;
+
+ case long i64:
+ AppendLong(i64);
+ return ColumnType.Int64;
+
+ case string str:
+ AppendString(str);
+ return ColumnType.String;
+
+ case Guid uuid:
+ AppendGuid(uuid);
+ return ColumnType.Uuid;
+
+ case sbyte i8:
+ AppendByte(i8);
+ return ColumnType.Int8;
+
+ case short i16:
+ AppendShort(i16);
+ return ColumnType.Int16;
+
+ case float f32:
+ AppendFloat(f32);
+ return ColumnType.Float;
+
+ case double f64:
+ AppendDouble(f64);
+ return ColumnType.Double;
+
+ case byte[] bytes:
+ AppendBytes(bytes);
+ return ColumnType.ByteArray;
+
+ case decimal dec:
+ var bigDec0 = new BigDecimal(dec);
+ AppendBigDecimal(bigDec0, bigDec0.Scale);
+ return ColumnType.Decimal;
+
+ case BigDecimal bigDec:
+ AppendBigDecimal(bigDec, bigDec.Scale);
+ return ColumnType.Decimal;
+
+ case LocalDate localDate:
+ AppendDate(localDate);
+ return ColumnType.Date;
+
+ case LocalTime localTime:
+ AppendTime(localTime, TemporalTypes.MaxTimePrecision);
+ return ColumnType.Time;
+
+ case LocalDateTime localDateTime:
+ AppendDateTime(localDateTime,
TemporalTypes.MaxTimePrecision);
+ return ColumnType.Datetime;
+
+ case Instant instant:
+ AppendTimestamp(instant, TemporalTypes.MaxTimePrecision);
+ return ColumnType.Timestamp;
+
+ case Period period:
+ AppendPeriod(period);
+ return ColumnType.Period;
+
+ case Duration duration:
+ AppendDuration(duration);
+ return ColumnType.Duration;
+
+ default:
+ throw new
IgniteClientException(ErrorGroups.Client.Protocol, "Unsupported type: " +
value.GetType());
+ }
+ }
+
/// <summary>
/// Appends an object.
/// </summary>
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackWriter.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackWriter.cs
index 36c1ff5a8f6..635d083aab8 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackWriter.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackWriter.cs
@@ -23,8 +23,6 @@ using System.Buffers.Binary;
using System.Collections.Generic;
using BinaryTuple;
using Buffers;
-using Ignite.Sql;
-using Marshalling;
/// <summary>
/// MsgPack writer. Wraps <see cref="PooledArrayBuffer"/>. Writer index is
kept by the buffer, so this struct is readonly.
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamerWithReceiver.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamerWithReceiver.cs
index 9537085964c..9905fc75c22 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamerWithReceiver.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamerWithReceiver.cs
@@ -408,9 +408,32 @@ internal static class DataStreamerWithReceiver
using var builder = new BinaryTupleBuilder(binaryTupleSize);
builder.AppendString(className);
- builder.AppendObjectWithType(arg);
- builder.AppendObjectCollectionWithType(items);
+ if (arg is IIgniteTuple tupleArg)
+ {
+ builder.AppendInt(TupleWithSchemaMarshalling.TypeIdTuple);
+ builder.AppendInt(0); // Scale.
+ builder.AppendBytes(static (bufWriter, arg) =>
TupleWithSchemaMarshalling.Pack(bufWriter, arg), tupleArg);
+ }
+ else
+ {
+ builder.AppendObjectWithType(arg);
+ }
+
+ if (items[0] is IIgniteTuple)
+ {
+ builder.AppendInt(TupleWithSchemaMarshalling.TypeIdTuple);
+ builder.AppendInt(items.Length);
+
+ foreach (var item in items)
+ {
+ builder.AppendBytes(static (bufWriter, arg) =>
TupleWithSchemaMarshalling.Pack(bufWriter, (IIgniteTuple)arg!), item);
+ }
+ }
+ else
+ {
+ builder.AppendObjectCollectionWithType(items);
+ }
w.Write(binaryTupleSize);
w.Write(builder.Build().Span);
@@ -457,6 +480,19 @@ internal static class DataStreamerWithReceiver
var tuple = new BinaryTupleReader(reader.ReadBinary(),
numElements);
+ if (tuple.GetInt(0) == TupleWithSchemaMarshalling.TypeIdTuple)
+ {
+ int elementCount = tuple.GetInt(1);
+ T[] resultsPooledArr = ArrayPool<T>.Shared.Rent(elementCount);
+
+ for (var i = 0; i < elementCount; i++)
+ {
+ resultsPooledArr[i] =
(T)(object)TupleWithSchemaMarshalling.Unpack(tuple.GetBytesSpan(2 + i));
+ }
+
+ return (resultsPooledArr, elementCount);
+ }
+
return tuple.GetObjectCollectionWithType<T>();
}
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/TupleWithSchemaMarshalling.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/TupleWithSchemaMarshalling.cs
new file mode 100644
index 00000000000..14bb0745fe8
--- /dev/null
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/TupleWithSchemaMarshalling.cs
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Internal.Table.Serialization;
+
+using System;
+using System.Buffers;
+using System.Buffers.Binary;
+using Ignite.Sql;
+using Ignite.Table;
+using Proto.BinaryTuple;
+
+/// <summary>
+/// Tuple with schema marshalling - see also
o.a.i.internal.binarytuple.inlineschema.TupleWithSchemaMarshalling.
+/// </summary>
+internal static class TupleWithSchemaMarshalling
+{
+ /// <summary>
+ /// Type id for <see cref="IIgniteTuple"/>.
+ /// </summary>
+ public const int TypeIdTuple = -1;
+
+ /// <summary>
+ /// Packs tuple with schema.
+ /// </summary>
+ /// <param name="w">Packer.</param>
+ /// <param name="tuple">Tuple.</param>
+ public static void Pack(IBufferWriter<byte> w, IIgniteTuple tuple)
+ {
+ int elementCount = tuple.FieldCount;
+
+ using var schemaBuilder = new BinaryTupleBuilder(elementCount * 2);
+ using var valueBuilder = new BinaryTupleBuilder(elementCount);
+
+ for (int i = 0; i < elementCount; i++)
+ {
+ string fieldName = tuple.GetName(i);
+ object? fieldValue = tuple[i];
+
+ schemaBuilder.AppendString(fieldName);
+
+ if (fieldValue is IIgniteTuple nestedTuple)
+ {
+ valueBuilder.AppendBytes(static (bufWriter, arg) =>
Pack(bufWriter, arg), nestedTuple);
+ schemaBuilder.AppendInt(TypeIdTuple);
+ }
+ else
+ {
+ ColumnType typeId =
valueBuilder.AppendObjectAndGetType(fieldValue);
+ schemaBuilder.AppendInt((int)typeId);
+ }
+ }
+
+ Memory<byte> schemaMem = schemaBuilder.Build();
+ Memory<byte> valueMem = valueBuilder.Build();
+
+ // Size: int32 (tuple size), int32 (value offset), schema, value.
+ var schemaOffset = 8;
+ var valueOffset = schemaOffset + schemaMem.Length;
+ var totalSize = valueOffset + valueMem.Length;
+
+ Span<byte> targetSpan = w.GetSpan(totalSize);
+ w.Advance(totalSize);
+
+ BinaryPrimitives.WriteInt32LittleEndian(targetSpan, elementCount);
+ BinaryPrimitives.WriteInt32LittleEndian(targetSpan[4..], valueOffset);
+ schemaMem.Span.CopyTo(targetSpan[schemaOffset..]);
+ valueMem.Span.CopyTo(targetSpan[valueOffset..]);
+ }
+
+ /// <summary>
+ /// Unpacks tuple with schema.
+ /// </summary>
+ /// <param name="span">Bytes.</param>
+ /// <returns>Tuple.</returns>
+ public static IgniteTuple Unpack(ReadOnlySpan<byte> span)
+ {
+ int elementCount = BinaryPrimitives.ReadInt32LittleEndian(span);
+ int valueOffset = BinaryPrimitives.ReadInt32LittleEndian(span[4..]);
+
+ ReadOnlySpan<byte> schemaBytes = span[8..valueOffset];
+ ReadOnlySpan<byte> valueBytes = span[valueOffset..];
+
+ var res = new IgniteTuple(elementCount);
+
+ var schemaReader = new BinaryTupleReader(schemaBytes, elementCount *
2);
+ var valueReader = new BinaryTupleReader(valueBytes, elementCount);
+
+ for (int i = 0; i < elementCount; i++)
+ {
+ string fieldName = schemaReader.GetString(i * 2);
+ int fieldTypeId = schemaReader.GetInt(i * 2 + 1);
+
+ if (fieldTypeId == TypeIdTuple)
+ {
+ res[fieldName] = Unpack(valueReader.GetBytesSpan(i));
+ }
+ else
+ {
+ res[fieldName] = valueReader.GetObject(i,
(ColumnType)fieldTypeId);
+ }
+ }
+
+ return res;
+ }
+}