This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 9a467ff103 IGNITE-22356 .NET: Add results support to Data Streamer 
with receiver (#3935)
9a467ff103 is described below

commit 9a467ff103743ec1f78dfe7aa34201de781ef41e
Author: Pavel Tupitsyn <ptupit...@apache.org>
AuthorDate: Mon Jun 17 17:19:58 2024 +0300

    IGNITE-22356 .NET: Add results support to Data Streamer with receiver 
(#3935)
    
    * Implement receiver results deserialization
    * Pass the results to the user as `IAsyncEnumerable<TResult>`
    * Resulting `IAsyncEnumerable` applies back-pressure to the streamer (we 
don't want to buffer infinite number of results), so the user has to consume 
the results to complete the streaming
    * The user can choose to consume the results partially by disposing the 
enumerator (or ending the `await foreach` loop). We still complete the 
streaming in this case, but discard the remaining results
    * If the resulting `IAsyncEnumerable` is cancelled, the streamer is 
cancelled too
---
 .../Proto/BinaryTuple/BinaryTupleTests.cs          |   8 +-
 .../Apache.Ignite.Tests/Table/DataStreamerTests.cs | 261 ++++++++++++++++++++-
 .../Proto/BinaryTuple/BinaryTupleBuilder.cs        |  15 ++
 .../Proto/BinaryTuple/BinaryTupleReader.cs         |  37 +++
 .../Internal/Table/DataStreamerWithReceiver.cs     |  77 ++++--
 .../Apache.Ignite/Internal/Table/RecordView.cs     | 108 +++++++--
 .../Apache.Ignite/Table/IDataStreamerTarget.cs     |   7 +-
 .../runner/app/PlatformTestNodeRunner.java         |  24 +-
 8 files changed, 487 insertions(+), 50 deletions(-)

diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/BinaryTuple/BinaryTupleTests.cs
 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/BinaryTuple/BinaryTupleTests.cs
index 02e5691722..9b93e66287 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/BinaryTuple/BinaryTupleTests.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/BinaryTuple/BinaryTupleTests.cs
@@ -839,8 +839,11 @@ namespace Apache.Ignite.Tests.Proto.BinaryTuple
                     b.AppendObjectWithType(date);
                     b.AppendObjectWithType(dateTime);
                     b.AppendObjectWithType(Instant.FromDateTimeUtc(utcNow));
+                    b.AppendObjectWithType(true);
+                    b.AppendObjectWithType(Period.FromDays(2));
+                    b.AppendObjectWithType(Duration.FromDays(3));
                 },
-                17 * 3);
+                20 * 3);
 
             Assert.IsNull(reader.GetObject(0));
             Assert.AreEqual(sbyte.MaxValue, reader.GetObject(3));
@@ -859,6 +862,9 @@ namespace Apache.Ignite.Tests.Proto.BinaryTuple
             Assert.AreEqual(date, reader.GetObject(42));
             Assert.AreEqual(dateTime, reader.GetObject(45));
             Assert.AreEqual(Instant.FromDateTimeUtc(utcNow), 
reader.GetObject(48));
+            Assert.IsTrue((bool)reader.GetObject(51)!);
+            Assert.AreEqual(Period.FromDays(2), reader.GetObject(54));
+            Assert.AreEqual(Duration.FromDays(3), reader.GetObject(57));
         }
 
         [Test]
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
index 9db142969c..7b7d73f958 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
@@ -22,6 +22,7 @@ using System.Collections;
 using System.Collections.Generic;
 using System.Data;
 using System.Linq;
+using System.Numerics;
 using System.Runtime.CompilerServices;
 using System.Threading;
 using System.Threading.Tasks;
@@ -42,6 +43,8 @@ public class DataStreamerTests : IgniteTestsBase
 {
     private const string TestReceiverClassName = 
ComputeTests.PlatformTestNodeRunner + "$TestReceiver";
 
+    private const string EchoArgsReceiverClassName = 
ComputeTests.PlatformTestNodeRunner + "$EchoArgsReceiver";
+
     private const string UpsertElementTypeNameReceiverClassName = 
ComputeTests.PlatformTestNodeRunner + "$UpsertElementTypeNameReceiver";
 
     private const int Count = 100;
@@ -50,6 +53,29 @@ public class DataStreamerTests : IgniteTestsBase
 
     private const int DeletedKey = Count + 1;
 
+    private static readonly object[] AllSupportedTypes =
+    {
+        true,
+        sbyte.MaxValue,
+        short.MinValue,
+        int.MaxValue,
+        long.MinValue,
+        float.MaxValue,
+        double.MinValue,
+        decimal.One,
+        new LocalDate(1234, 5, 6),
+        new LocalTime(12, 3, 4, 567),
+        new LocalDateTime(1234, 5, 6, 7, 8, 9),
+        Instant.FromUnixTimeSeconds(123456),
+        Guid.Empty,
+        new BitArray(new[] { byte.MaxValue }),
+        "str123",
+        new byte[] { 1, 2, 3 },
+        Period.FromDays(999),
+        Duration.FromSeconds(12345),
+        new BigInteger(12.34)
+    };
+
     private static int _unknownKey = 333000;
 
     [SetUp]
@@ -179,7 +205,7 @@ public class DataStreamerTests : IgniteTestsBase
     }
 
     [Test]
-    public void TestOptionsValidation()
+    public void TestOptionsValidation([Values(true, false, null)] bool? 
withReceiverResults)
     {
         AssertException(DataStreamerOptions.Default with { PageSize = -10 }, 
"PageSize should be positive.");
         AssertException(DataStreamerOptions.Default with { RetryLimit = -1 }, 
"RetryLimit should be non-negative.");
@@ -190,7 +216,42 @@ public class DataStreamerTests : IgniteTestsBase
         void AssertException(DataStreamerOptions options, string message)
         {
             var ex = Assert.ThrowsAsync<ArgumentException>(
-                async () => await 
Table.RecordBinaryView.StreamDataAsync(Array.Empty<IIgniteTuple>().ToAsyncEnumerable(),
 options));
+                async () =>
+                {
+                    switch (withReceiverResults)
+                    {
+                        // No receiver.
+                        case null:
+                            await 
Table.RecordBinaryView.StreamDataAsync(Array.Empty<IIgniteTuple>().ToAsyncEnumerable(),
 options);
+                            break;
+
+                        // Receiver without results.
+                        case false:
+                            await 
Table.RecordBinaryView.StreamDataAsync<IIgniteTuple, string>(
+                                
Array.Empty<IIgniteTuple>().ToAsyncEnumerable(),
+                                t => t,
+                                t => t.ToString()!,
+                                Array.Empty<DeploymentUnit>(),
+                                TestReceiverClassName,
+                                null,
+                                options);
+
+                            break;
+
+                        // Receiver with results.
+                        case true:
+                            await 
Table.RecordBinaryView.StreamDataAsync<IIgniteTuple, string, string>(
+                                
Array.Empty<IIgniteTuple>().ToAsyncEnumerable(),
+                                t => t,
+                                t => t.ToString()!,
+                                Array.Empty<DeploymentUnit>(),
+                                TestReceiverClassName,
+                                null,
+                                options).ToListAsync();
+
+                            break;
+                    }
+                });
 
             StringAssert.Contains(message, ex?.Message);
         }
@@ -332,6 +393,35 @@ public class DataStreamerTests : IgniteTestsBase
         }
     }
 
+    [Test]
+    public async Task TestWithReceiverWithResultsRecordBinaryView()
+    {
+        IAsyncEnumerable<string> results = TupleView.StreamDataAsync<int, 
string, string>(
+            Enumerable.Range(0, Count).ToAsyncEnumerable(),
+            keySelector: x => GetTuple(x),
+            payloadSelector: x => $"{x}-value{x * 10}",
+            units: Array.Empty<DeploymentUnit>(),
+            receiverClassName: TestReceiverClassName,
+            receiverArgs: new object[] { Table.Name, "arg1", 22 },
+            options: DataStreamerOptions.Default);
+
+        var resultSet = await results.ToHashSetAsync();
+
+        for (int i = 0; i < Count; i++)
+        {
+            var res = await TupleView.GetAsync(null, GetTuple(i));
+
+            var expectedVal = $"value{i * 10}_arg1_22";
+
+            Assert.IsTrue(res.HasValue);
+            Assert.AreEqual(expectedVal, res.Value[ValCol]);
+
+            CollectionAssert.Contains(resultSet, expectedVal);
+        }
+
+        Assert.AreEqual(Count, resultSet.Count);
+    }
+
     [Test]
     public async Task TestWithReceiverRecordView()
     {
@@ -353,6 +443,35 @@ public class DataStreamerTests : IgniteTestsBase
         }
     }
 
+    [Test]
+    public async Task TestWithReceiverResultsRecordView()
+    {
+        IAsyncEnumerable<string> results = PocoView.StreamDataAsync<int, 
string, string>(
+            Enumerable.Range(0, Count).ToAsyncEnumerable(),
+            keySelector: x => GetPoco(x),
+            payloadSelector: x => $"{x}-value{x * 10}",
+            units: Array.Empty<DeploymentUnit>(),
+            receiverClassName: TestReceiverClassName,
+            receiverArgs: new object[] { Table.Name, "arg1", 22 },
+            options: DataStreamerOptions.Default);
+
+        var resultSet = await results.ToHashSetAsync();
+
+        for (int i = 0; i < Count; i++)
+        {
+            var res = await TupleView.GetAsync(null, GetTuple(i));
+
+            var expectedVal = $"value{i * 10}_arg1_22";
+
+            Assert.IsTrue(res.HasValue);
+            Assert.AreEqual(expectedVal, res.Value[ValCol]);
+
+            CollectionAssert.Contains(resultSet, expectedVal);
+        }
+
+        Assert.AreEqual(Count, resultSet.Count);
+    }
+
     [Test]
     public async Task TestWithReceiverKeyValueBinaryView()
     {
@@ -373,6 +492,34 @@ public class DataStreamerTests : IgniteTestsBase
         }
     }
 
+    [Test]
+    public async Task TestWithReceiverResultsKeyValueBinaryView()
+    {
+        IAsyncEnumerable<string> results = 
Table.KeyValueBinaryView.StreamDataAsync<int, string, string>(
+            Enumerable.Range(0, Count).ToAsyncEnumerable(),
+            keySelector: x => new KeyValuePair<IIgniteTuple, 
IIgniteTuple>(GetTuple(x), new IgniteTuple()),
+            payloadSelector: x => $"{x}-value{x * 10}",
+            units: Array.Empty<DeploymentUnit>(),
+            receiverClassName: TestReceiverClassName,
+            receiverArgs: new object[] { Table.Name, "arg1", 22 });
+
+        var resultSet = await results.ToHashSetAsync();
+
+        for (int i = 0; i < Count; i++)
+        {
+            var res = await TupleView.GetAsync(null, GetTuple(i));
+
+            var expectedVal = $"value{i * 10}_arg1_22";
+
+            Assert.IsTrue(res.HasValue);
+            Assert.AreEqual(expectedVal, res.Value[ValCol]);
+
+            CollectionAssert.Contains(resultSet, expectedVal);
+        }
+
+        Assert.AreEqual(Count, resultSet.Count);
+    }
+
     [Test]
     public async Task TestWithReceiverKeyValueView()
     {
@@ -393,6 +540,34 @@ public class DataStreamerTests : IgniteTestsBase
         }
     }
 
+    [Test]
+    public async Task TestWithReceiverResultsKeyValueView()
+    {
+        IAsyncEnumerable<string> results = Table.GetKeyValueView<long, 
Poco>().StreamDataAsync<int, string, string>(
+            Enumerable.Range(0, Count).ToAsyncEnumerable(),
+            keySelector: x => new KeyValuePair<long, Poco>(x, null!),
+            payloadSelector: x => $"{x}-value{x * 10}",
+            units: Array.Empty<DeploymentUnit>(),
+            receiverClassName: TestReceiverClassName,
+            receiverArgs: new object[] { Table.Name, "arg11", 55});
+
+        var resultSet = await results.ToHashSetAsync();
+
+        for (int i = 0; i < Count; i++)
+        {
+            var res = await TupleView.GetAsync(null, GetTuple(i));
+
+            var expectedVal = $"value{i * 10}_arg11_55";
+
+            Assert.IsTrue(res.HasValue);
+            Assert.AreEqual(expectedVal, res.Value[ValCol]);
+
+            CollectionAssert.Contains(resultSet, expectedVal);
+        }
+
+        Assert.AreEqual(Count, resultSet.Count);
+    }
+
     [Test]
     public void TestUnknownReceiverClass()
     {
@@ -422,6 +597,21 @@ public class DataStreamerTests : IgniteTestsBase
         Assert.AreEqual("Streamer receiver failed: Job execution failed: 
java.lang.ArithmeticException: Test exception: 1", ex.Message);
     }
 
+    [Test]
+    public void TestReceiverWithResultsException()
+    {
+        var ex = Assert.ThrowsAsync<IgniteException>(async () =>
+            await PocoView.StreamDataAsync<int, string, string>(
+                Enumerable.Range(0, 1).ToAsyncEnumerable(),
+                keySelector: x => GetPoco(x),
+                payloadSelector: _ => string.Empty,
+                units: Array.Empty<DeploymentUnit>(),
+                receiverClassName: TestReceiverClassName,
+                receiverArgs: new object[] { "throw", "throw", 1 
}).ToListAsync());
+
+        Assert.AreEqual("Streamer receiver failed: Job execution failed: 
java.lang.ArithmeticException: Test exception: 1", ex.Message);
+    }
+
     [Test]
     public void TestReceiverSelectorException([Values(true, false)] bool 
keySelector)
     {
@@ -509,6 +699,73 @@ public class DataStreamerTests : IgniteTestsBase
             ex.Message);
     }
 
+    [TestCaseSource(nameof(AllSupportedTypes))]
+    public async Task TestEchoReceiverAllDataTypes(object arg)
+    {
+        var res = await PocoView.StreamDataAsync<object, object, object>(
+            new object[] { 1 }.ToAsyncEnumerable(),
+            keySelector: x => new Poco(),
+            payloadSelector: x => x.ToString()!,
+            units: Array.Empty<DeploymentUnit>(),
+            receiverClassName: EchoArgsReceiverClassName,
+            receiverArgs: new[] { arg }).SingleAsync();
+
+        Assert.AreEqual(arg, res);
+    }
+
+    [Test]
+    public async Task TestResultConsumerEarlyExit()
+    {
+        IAsyncEnumerable<string> results = PocoView.StreamDataAsync<int, 
string, string>(
+            Enumerable.Range(0, Count).ToAsyncEnumerable(),
+            keySelector: x => GetPoco(x),
+            payloadSelector: x => $"{x}-value{x * 10}",
+            units: Array.Empty<DeploymentUnit>(),
+            receiverClassName: TestReceiverClassName,
+            receiverArgs: new object[] { Table.Name, "arg1", 22 },
+            options: DataStreamerOptions.Default with { PageSize = 1 });
+
+        // Read only part of the results.
+        var resultSet = await results.Take(3).ToListAsync();
+        Assert.AreEqual(3, resultSet.Count);
+
+        for (int i = 0; i < Count; i++)
+        {
+            var res = await TupleView.GetAsync(null, GetTuple(i));
+
+            var expectedVal = $"value{i * 10}_arg1_22";
+
+            Assert.IsTrue(res.HasValue, $"Key {i} not found");
+            Assert.AreEqual(expectedVal, res.Value[ValCol]);
+        }
+    }
+
+    [Test]
+    public async Task TestResultConsumerCancellation()
+    {
+        IAsyncEnumerable<string> results = PocoView.StreamDataAsync<int, 
string, string>(
+            Enumerable.Range(0, Count).ToAsyncEnumerable(),
+            keySelector: x => GetPoco(x),
+            payloadSelector: x => $"{x}-value{x * 10}",
+            units: Array.Empty<DeploymentUnit>(),
+            receiverClassName: TestReceiverClassName,
+            receiverArgs: new object[] { Table.Name, "arg1", 22 },
+            options: DataStreamerOptions.Default with { PageSize = 1 });
+
+        var cts = new CancellationTokenSource();
+
+        await using var enumerator = results.GetAsyncEnumerator(cts.Token);
+        Assert.IsTrue(await enumerator.MoveNextAsync());
+
+        // Cancel the resulting enumerator before it's fully consumed. This 
stops the streamer.
+        cts.Cancel();
+        Assert.ThrowsAsync<TaskCanceledException>(async () => await 
enumerator.MoveNextAsync());
+
+        // Only part of the data was streamed.
+        var streamedData = await TupleView.GetAllAsync(null, 
Enumerable.Range(0, Count).Select(x => GetTuple(x)));
+        Assert.Less(streamedData.Count(x => x.HasValue), Count / 2);
+    }
+
     private static async IAsyncEnumerable<IIgniteTuple> GetFakeServerData(int 
count)
     {
         for (var i = 0; i < count; i++)
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleBuilder.cs
 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleBuilder.cs
index 5b9d3c2aa8..e7ccd6c478 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleBuilder.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleBuilder.cs
@@ -919,6 +919,11 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
                     AppendNull(); // Value.
                     break;
 
+                case bool b:
+                    AppendTypeAndScale(ColumnType.Boolean);
+                    AppendBool(b);
+                    break;
+
                 case int i32:
                     AppendTypeAndScale(ColumnType.Int32);
                     AppendInt(i32);
@@ -995,6 +1000,16 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
                     AppendTimestamp(instant, timestampPrecision);
                     break;
 
+                case Period period:
+                    AppendTypeAndScale(ColumnType.Period);
+                    AppendPeriod(period);
+                    break;
+
+                case Duration duration:
+                    AppendTypeAndScale(ColumnType.Duration);
+                    AppendDuration(duration);
+                    break;
+
                 case BitArray bitArray:
                     AppendTypeAndScale(ColumnType.Bitmask);
                     AppendBitmask(bitArray);
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleReader.cs
 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleReader.cs
index bc6993187b..1931ea9115 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleReader.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleReader.cs
@@ -18,6 +18,7 @@
 namespace Apache.Ignite.Internal.Proto.BinaryTuple
 {
     using System;
+    using System.Buffers;
     using System.Buffers.Binary;
     using System.Collections;
     using System.Diagnostics;
@@ -507,6 +508,42 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple
             return GetObject(index + 2, type, scale);
         }
 
+        /// <summary>
+        /// Gets an object collection with the specified element type.
+        /// Opposite of <see 
cref="BinaryTupleBuilder.AppendObjectCollectionWithType{T}"/>.
+        /// </summary>
+        /// <param name="index">Start index.</param>
+        /// <typeparam name="T">Element type.</typeparam>
+        /// <returns>Pooled array with items and actual item count.</returns>
+        public (T[] Items, int Count) GetObjectCollectionWithType<T>(int index 
= 0)
+        {
+            int typeId = GetInt(index++);
+            int count = GetInt(index++);
+
+            if (count == 0)
+            {
+                return (Array.Empty<T>(), 0);
+            }
+
+            var items = ArrayPool<T>.Shared.Rent(count);
+            var type = (ColumnType)typeId;
+
+            try
+            {
+                for (int i = 0; i < count; i++)
+                {
+                    items[i] = (T)GetObject(index + i, type)!;
+                }
+
+                return (items, count);
+            }
+            catch (Exception)
+            {
+                ArrayPool<T>.Shared.Return(items);
+                throw;
+            }
+        }
+
         private static LocalDate ReadDate(ReadOnlySpan<byte> span)
         {
             // Read int32 from 3 bytes, preserving sign.
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamerWithReceiver.cs
 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamerWithReceiver.cs
index bab2437a96..3dbc64269d 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamerWithReceiver.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamerWithReceiver.cs
@@ -23,9 +23,9 @@ using System.Collections.Generic;
 using System.Diagnostics;
 using System.Diagnostics.CodeAnalysis;
 using System.Linq;
-using System.Runtime.CompilerServices;
 using System.Runtime.InteropServices;
 using System.Threading;
+using System.Threading.Channels;
 using System.Threading.Tasks;
 using Buffers;
 using Common;
@@ -60,7 +60,7 @@ internal static class DataStreamerWithReceiver
     /// <param name="payloadSelector">Payload func.</param>
     /// <param name="keyWriter">Key writer.</param>
     /// <param name="options">Options.</param>
-    /// <param name="expectResults">Whether to expect results from the 
receiver.</param>
+    /// <param name="resultChannel">Channel for results from the receiver. 
Null when results are not expected.</param>
     /// <param name="units">Deployment units. Can be empty.</param>
     /// <param name="receiverClassName">Java class name of the streamer 
receiver to execute on the server.</param>
     /// <param name="receiverArgs">Receiver args.</param>
@@ -70,30 +70,24 @@ internal static class DataStreamerWithReceiver
     /// <typeparam name="TPayload">Payload type.</typeparam>
     /// <typeparam name="TResult">Result type.</typeparam>
     /// <returns>A <see cref="Task"/> representing the asynchronous 
operation.</returns>
-    internal static async IAsyncEnumerable<TResult> StreamDataAsync<TSource, 
TKey, TPayload, TResult>(
+    internal static async Task StreamDataAsync<TSource, TKey, TPayload, 
TResult>(
         IAsyncEnumerable<TSource> data,
         Table table,
         Func<TSource, TKey> keySelector,
         Func<TSource, TPayload> payloadSelector,
         IRecordSerializerHandler<TKey> keyWriter,
         DataStreamerOptions options,
-        bool expectResults,
+        Channel<TResult>? resultChannel,
         IEnumerable<DeploymentUnit> units,
         string receiverClassName,
         ICollection<object>? receiverArgs,
-        [EnumeratorCancellation] CancellationToken cancellationToken)
+        CancellationToken cancellationToken)
         where TKey : notnull
         where TPayload : notnull
     {
         IgniteArgumentCheck.NotNull(data);
         DataStreamer.ValidateOptions(options);
 
-        if (expectResults)
-        {
-            // TODO IGNITE-22356 Support result retrieval.
-            throw new NotSupportedException("Result retrieval is not yet 
supported.");
-        }
-
         // ConcurrentDictionary is not necessary because we consume the source 
sequentially.
         // However, locking for batches is required due to auto-flush 
background task.
         var batches = new Dictionary<int, Batch<TPayload>>();
@@ -148,8 +142,7 @@ internal static class DataStreamerWithReceiver
             }
         }
 
-        // TODO IGNITE-22356 Support result retrieval.
-        yield break;
+        return;
 
         Batch<TPayload> Add(TSource item)
         {
@@ -246,6 +239,7 @@ internal static class DataStreamerWithReceiver
             await Task.Yield();
 
             var buf = ProtoCommon.GetMessageWriter();
+            TResult[]? results = null;
 
             try
             {
@@ -256,13 +250,33 @@ internal static class DataStreamerWithReceiver
 
                 // Wait for the previous batch for this node to preserve item 
order.
                 await oldTask.ConfigureAwait(false);
-                await SendBatchAsync(table, buf, count, preferredNode, 
retryPolicy).ConfigureAwait(false);
+                (results, int resultsCount) = await SendBatchAsync<TResult>(
+                    table, buf, count, preferredNode, retryPolicy, 
expectResults: resultChannel != null).ConfigureAwait(false);
+
+                if (results != null && resultChannel != null)
+                {
+                    for (var i = 0; i < resultsCount; i++)
+                    {
+                        TResult result = results[i];
+                        await resultChannel.Writer.WriteAsync(result, 
cancellationToken).ConfigureAwait(false);
+                    }
+                }
+            }
+            catch (ChannelClosedException)
+            {
+                // Consumer does not want more results, stop returning them, 
but keep streaming.
+                resultChannel = null;
             }
             finally
             {
                 buf.Dispose();
                 GetPool<TPayload>().Return(items);
 
+                if (results != null)
+                {
+                    GetPool<TResult>().Return(results);
+                }
+
                 Metrics.StreamerItemsQueuedDecrement(count);
                 Metrics.StreamerBatchesActiveDecrement();
             }
@@ -311,6 +325,7 @@ internal static class DataStreamerWithReceiver
 
             Compute.WriteUnits(units0, buf);
 
+            var expectResults = resultChannel != null;
             w.Write(expectResults);
             WriteReceiverPayload(ref w, receiverClassName, receiverArgs ?? 
Array.Empty<object>(), items);
         }
@@ -339,12 +354,13 @@ internal static class DataStreamerWithReceiver
         w.Write(builder.Build().Span);
     }
 
-    private static async Task SendBatchAsync(
+    private static async Task<(T[]? ResultsPooledArray, int ResultsCount)> 
SendBatchAsync<T>(
         Table table,
         PooledArrayBuffer buf,
         int count,
         PreferredNode preferredNode,
-        IRetryPolicy retryPolicy)
+        IRetryPolicy retryPolicy,
+        bool expectResults)
     {
         var (resBuf, socket) = await table.Socket.DoOutInOpAndGetSocketAsync(
                 ClientOp.StreamerWithReceiverBatchSend,
@@ -354,10 +370,33 @@ internal static class DataStreamerWithReceiver
                 retryPolicy)
             .ConfigureAwait(false);
 
-        resBuf.Dispose();
+        using (resBuf)
+        {
+            Metrics.StreamerBatchesSent.Add(1, socket.MetricsContext.Tags);
+            Metrics.StreamerItemsSent.Add(count, socket.MetricsContext.Tags);
+
+            return expectResults
+                ? Read(resBuf.GetReader())
+                : (null, 0);
+        }
+
+        static (T[]? ResultsPooledArray, int ResultsCount) Read(MsgPackReader 
reader)
+        {
+            if (reader.TryReadNil())
+            {
+                return (null, 0);
+            }
+
+            var numElements = reader.ReadInt32();
+            if (numElements == 0)
+            {
+                return (null, 0);
+            }
+
+            var tuple = new BinaryTupleReader(reader.ReadBinary(), 
numElements);
 
-        Metrics.StreamerBatchesSent.Add(1, socket.MetricsContext.Tags);
-        Metrics.StreamerItemsSent.Add(count, socket.MetricsContext.Tags);
+            return tuple.GetObjectCollectionWithType<T>();
+        }
     }
 
     private static ArrayPool<T> GetPool<T>() => ArrayPool<T>.Shared;
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
index f731a65a43..f96973864e 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
@@ -19,10 +19,11 @@ namespace Apache.Ignite.Internal.Table
 {
     using System;
     using System.Collections.Generic;
-    using System.Diagnostics;
     using System.Diagnostics.CodeAnalysis;
     using System.Linq;
+    using System.Runtime.CompilerServices;
     using System.Threading;
+    using System.Threading.Channels;
     using System.Threading.Tasks;
     using Buffers;
     using Common;
@@ -307,7 +308,7 @@ namespace Apache.Ignite.Internal.Table
                 cancellationToken).ConfigureAwait(false);
 
         /// <inheritdoc/>
-        public IAsyncEnumerable<TResult> StreamDataAsync<TSource, TPayload, 
TResult>(
+        public async IAsyncEnumerable<TResult> StreamDataAsync<TSource, 
TPayload, TResult>(
             IAsyncEnumerable<TSource> data,
             Func<TSource, T> keySelector,
             Func<TSource, TPayload> payloadSelector,
@@ -315,20 +316,83 @@ namespace Apache.Ignite.Internal.Table
             string receiverClassName,
             ICollection<object>? receiverArgs,
             DataStreamerOptions? options,
-            CancellationToken cancellationToken = default)
-            where TPayload : notnull =>
-            DataStreamerWithReceiver.StreamDataAsync<TSource, T, TPayload, 
TResult>(
-                data,
-                _table,
-                keySelector,
-                payloadSelector,
-                keyWriter: _ser.Handler,
-                options ?? DataStreamerOptions.Default,
-                expectResults: true,
-                units,
-                receiverClassName,
-                receiverArgs,
-                cancellationToken);
+            [EnumeratorCancellation] CancellationToken cancellationToken = 
default)
+            where TPayload : notnull
+        {
+            options ??= DataStreamerOptions.Default;
+
+            // Validate before using for channel capacity.
+            DataStreamer.ValidateOptions(options);
+
+            // Double the page size to read the next page while the previous 
one is being consumed.
+            var resultChannelCapacity = options.PageSize * 2;
+
+            Channel<TResult> resultChannel = 
Channel.CreateBounded<TResult>(new BoundedChannelOptions(resultChannelCapacity)
+            {
+                // Backpressure - streamer will wait for results to be 
consumed before streaming more.
+                FullMode = BoundedChannelFullMode.Wait,
+
+                // One reader: resulting IAsyncEnumerable.
+                SingleReader = true,
+
+                // Many writers: batches may complete in parallel.
+                SingleWriter = false
+            });
+
+            // Stream in background.
+            var streamTask = Stream();
+
+            // Result async enumerable is returned immediately. It will be 
completed when the streaming completes.
+            var reader = resultChannel.Reader;
+
+            try
+            {
+                while (await 
reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
+                {
+                    while (reader.TryRead(out var item))
+                    {
+                        yield return item;
+                    }
+                }
+            }
+            finally
+            {
+                // Consumer has stopped reading, complete the channel.
+                resultChannel.Writer.TryComplete();
+
+                // Wait for the streamer to complete even if the result 
consumer has stopped reading.
+                await streamTask.ConfigureAwait(false);
+            }
+
+            [SuppressMessage(
+                "Design",
+                "CA1031:Do not catch general exception types",
+                Justification = "All exceptions should be propagated to the 
result channel.")]
+            async Task Stream()
+            {
+                try
+                {
+                    await DataStreamerWithReceiver.StreamDataAsync(
+                        data,
+                        _table,
+                        keySelector,
+                        payloadSelector,
+                        keyWriter: _ser.Handler,
+                        options,
+                        resultChannel,
+                        units,
+                        receiverClassName,
+                        receiverArgs,
+                        cancellationToken).ConfigureAwait(false);
+
+                    resultChannel.Writer.Complete();
+                }
+                catch (Exception e)
+                {
+                    resultChannel.Writer.TryComplete(e);
+                }
+            }
+        }
 
         /// <inheritdoc/>
         public async Task StreamDataAsync<TSource, TPayload>(
@@ -342,24 +406,18 @@ namespace Apache.Ignite.Internal.Table
             CancellationToken cancellationToken = default)
             where TPayload : notnull
         {
-            IAsyncEnumerable<object> results = 
DataStreamerWithReceiver.StreamDataAsync<TSource, T, TPayload, object>(
+            await DataStreamerWithReceiver.StreamDataAsync<TSource, T, 
TPayload, object>(
                 data,
                 _table,
                 keySelector,
                 payloadSelector,
                 keyWriter: _ser.Handler,
                 options ?? DataStreamerOptions.Default,
-                expectResults: false,
+                resultChannel: null,
                 units,
                 receiverClassName,
                 receiverArgs,
-                cancellationToken);
-
-            // Await streaming completion.
-            await foreach (var unused in results)
-            {
-                Debug.Fail("Got results with expectResults=false: " + unused);
-            }
+                cancellationToken).ConfigureAwait(false);
         }
 
         /// <inheritdoc/>
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Table/IDataStreamerTarget.cs 
b/modules/platforms/dotnet/Apache.Ignite/Table/IDataStreamerTarget.cs
index 31d3aae978..c5dd1aef5d 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Table/IDataStreamerTarget.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Table/IDataStreamerTarget.cs
@@ -79,7 +79,12 @@ public interface IDataStreamerTarget<T>
     /// <param name="receiverArgs">Receiver args.</param>
     /// <param name="options">Streamer options.</param>
     /// <param name="cancellationToken">Cancellation token.</param>
-    /// <returns>A <see cref="Task"/> representing the asynchronous 
operation.</returns>
+    /// <returns>
+    /// A <see cref="IAsyncEnumerable{T}"/> with the results from the receiver.
+    /// <para />
+    /// The resulting async enumerator applies back-pressure to the data 
source, so it should be either fully consumed
+    /// or disposed to complete the streaming. Disposing the enumerator before 
it is fully consumed will ignore the remaining results.
+    /// </returns>
     /// <typeparam name="TSource">Source item type.</typeparam>
     /// <typeparam name="TPayload">Payload type.</typeparam>
     /// <typeparam name="TResult">Result type.</typeparam>
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
index d66ba86956..2539d24935 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
@@ -790,18 +790,22 @@ public class PlatformTestNodeRunner {
 
             Table table = ctx.ignite().tables().table(tableName);
             RecordView<Tuple> recordView = table.recordView();
+            List<String> res = new ArrayList<>();
 
             for (String s : page) {
                 String[] parts = s.split("-", 2);
+                String val = parts[1] + "_" + arg1 + "_" + arg2;
 
                 Tuple rec = Tuple.create()
                         .set("key", Long.parseLong(parts[0]))
-                        .set("val", parts[1] + "_" + arg1 + "_" + arg2);
+                        .set("val", val);
+
+                res.add(val);
 
                 recordView.upsert(null, rec);
             }
 
-            return null;
+            return CompletableFuture.completedFuture(res);
         }
     }
 
@@ -834,4 +838,20 @@ public class PlatformTestNodeRunner {
             return null;
         }
     }
+
+    @SuppressWarnings("unused") // Used by platform tests.
+    private static class EchoArgsReceiver implements 
DataStreamerReceiver<Object, Object> {
+        @Override
+        public CompletableFuture<List<Object>> receive(List<Object> page, 
DataStreamerReceiverContext ctx, Object... args) {
+            return CompletableFuture.completedFuture(List.of(args));
+        }
+    }
+
+    @SuppressWarnings("unused") // Used by platform tests.
+    private static class EchoReceiver implements DataStreamerReceiver<Object, 
Object> {
+        @Override
+        public CompletableFuture<List<Object>> receive(List<Object> page, 
DataStreamerReceiverContext ctx, Object... args) {
+            return CompletableFuture.completedFuture(page);
+        }
+    }
 }


Reply via email to