This is an automated email from the ASF dual-hosted git repository. ptupitsyn pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 9a467ff103 IGNITE-22356 .NET: Add results support to Data Streamer with receiver (#3935) 9a467ff103 is described below commit 9a467ff103743ec1f78dfe7aa34201de781ef41e Author: Pavel Tupitsyn <ptupit...@apache.org> AuthorDate: Mon Jun 17 17:19:58 2024 +0300 IGNITE-22356 .NET: Add results support to Data Streamer with receiver (#3935) * Implement receiver results deserialization * Pass the results to the user as `IAsyncEnumerable<TResult>` * Resulting `IAsyncEnumerable` applies back-pressure to the streamer (we don't want to buffer infinite number of results), so the user has to consume the results to complete the streaming * The user can choose to consume the results partially by disposing the enumerator (or ending the `await foreach` loop). We still complete the streaming in this case, but discard the remaining results * If the resulting `IAsyncEnumerable` is cancelled, the streamer is cancelled too --- .../Proto/BinaryTuple/BinaryTupleTests.cs | 8 +- .../Apache.Ignite.Tests/Table/DataStreamerTests.cs | 261 ++++++++++++++++++++- .../Proto/BinaryTuple/BinaryTupleBuilder.cs | 15 ++ .../Proto/BinaryTuple/BinaryTupleReader.cs | 37 +++ .../Internal/Table/DataStreamerWithReceiver.cs | 77 ++++-- .../Apache.Ignite/Internal/Table/RecordView.cs | 108 +++++++-- .../Apache.Ignite/Table/IDataStreamerTarget.cs | 7 +- .../runner/app/PlatformTestNodeRunner.java | 24 +- 8 files changed, 487 insertions(+), 50 deletions(-) diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/BinaryTuple/BinaryTupleTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/BinaryTuple/BinaryTupleTests.cs index 02e5691722..9b93e66287 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/BinaryTuple/BinaryTupleTests.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/BinaryTuple/BinaryTupleTests.cs @@ -839,8 +839,11 @@ namespace Apache.Ignite.Tests.Proto.BinaryTuple b.AppendObjectWithType(date); b.AppendObjectWithType(dateTime); b.AppendObjectWithType(Instant.FromDateTimeUtc(utcNow)); + b.AppendObjectWithType(true); + b.AppendObjectWithType(Period.FromDays(2)); + b.AppendObjectWithType(Duration.FromDays(3)); }, - 17 * 3); + 20 * 3); Assert.IsNull(reader.GetObject(0)); Assert.AreEqual(sbyte.MaxValue, reader.GetObject(3)); @@ -859,6 +862,9 @@ namespace Apache.Ignite.Tests.Proto.BinaryTuple Assert.AreEqual(date, reader.GetObject(42)); Assert.AreEqual(dateTime, reader.GetObject(45)); Assert.AreEqual(Instant.FromDateTimeUtc(utcNow), reader.GetObject(48)); + Assert.IsTrue((bool)reader.GetObject(51)!); + Assert.AreEqual(Period.FromDays(2), reader.GetObject(54)); + Assert.AreEqual(Duration.FromDays(3), reader.GetObject(57)); } [Test] diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs index 9db142969c..7b7d73f958 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs @@ -22,6 +22,7 @@ using System.Collections; using System.Collections.Generic; using System.Data; using System.Linq; +using System.Numerics; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; @@ -42,6 +43,8 @@ public class DataStreamerTests : IgniteTestsBase { private const string TestReceiverClassName = ComputeTests.PlatformTestNodeRunner + "$TestReceiver"; + private const string EchoArgsReceiverClassName = ComputeTests.PlatformTestNodeRunner + "$EchoArgsReceiver"; + private const string UpsertElementTypeNameReceiverClassName = ComputeTests.PlatformTestNodeRunner + "$UpsertElementTypeNameReceiver"; private const int Count = 100; @@ -50,6 +53,29 @@ public class DataStreamerTests : IgniteTestsBase private const int DeletedKey = Count + 1; + private static readonly object[] AllSupportedTypes = + { + true, + sbyte.MaxValue, + short.MinValue, + int.MaxValue, + long.MinValue, + float.MaxValue, + double.MinValue, + decimal.One, + new LocalDate(1234, 5, 6), + new LocalTime(12, 3, 4, 567), + new LocalDateTime(1234, 5, 6, 7, 8, 9), + Instant.FromUnixTimeSeconds(123456), + Guid.Empty, + new BitArray(new[] { byte.MaxValue }), + "str123", + new byte[] { 1, 2, 3 }, + Period.FromDays(999), + Duration.FromSeconds(12345), + new BigInteger(12.34) + }; + private static int _unknownKey = 333000; [SetUp] @@ -179,7 +205,7 @@ public class DataStreamerTests : IgniteTestsBase } [Test] - public void TestOptionsValidation() + public void TestOptionsValidation([Values(true, false, null)] bool? withReceiverResults) { AssertException(DataStreamerOptions.Default with { PageSize = -10 }, "PageSize should be positive."); AssertException(DataStreamerOptions.Default with { RetryLimit = -1 }, "RetryLimit should be non-negative."); @@ -190,7 +216,42 @@ public class DataStreamerTests : IgniteTestsBase void AssertException(DataStreamerOptions options, string message) { var ex = Assert.ThrowsAsync<ArgumentException>( - async () => await Table.RecordBinaryView.StreamDataAsync(Array.Empty<IIgniteTuple>().ToAsyncEnumerable(), options)); + async () => + { + switch (withReceiverResults) + { + // No receiver. + case null: + await Table.RecordBinaryView.StreamDataAsync(Array.Empty<IIgniteTuple>().ToAsyncEnumerable(), options); + break; + + // Receiver without results. + case false: + await Table.RecordBinaryView.StreamDataAsync<IIgniteTuple, string>( + Array.Empty<IIgniteTuple>().ToAsyncEnumerable(), + t => t, + t => t.ToString()!, + Array.Empty<DeploymentUnit>(), + TestReceiverClassName, + null, + options); + + break; + + // Receiver with results. + case true: + await Table.RecordBinaryView.StreamDataAsync<IIgniteTuple, string, string>( + Array.Empty<IIgniteTuple>().ToAsyncEnumerable(), + t => t, + t => t.ToString()!, + Array.Empty<DeploymentUnit>(), + TestReceiverClassName, + null, + options).ToListAsync(); + + break; + } + }); StringAssert.Contains(message, ex?.Message); } @@ -332,6 +393,35 @@ public class DataStreamerTests : IgniteTestsBase } } + [Test] + public async Task TestWithReceiverWithResultsRecordBinaryView() + { + IAsyncEnumerable<string> results = TupleView.StreamDataAsync<int, string, string>( + Enumerable.Range(0, Count).ToAsyncEnumerable(), + keySelector: x => GetTuple(x), + payloadSelector: x => $"{x}-value{x * 10}", + units: Array.Empty<DeploymentUnit>(), + receiverClassName: TestReceiverClassName, + receiverArgs: new object[] { Table.Name, "arg1", 22 }, + options: DataStreamerOptions.Default); + + var resultSet = await results.ToHashSetAsync(); + + for (int i = 0; i < Count; i++) + { + var res = await TupleView.GetAsync(null, GetTuple(i)); + + var expectedVal = $"value{i * 10}_arg1_22"; + + Assert.IsTrue(res.HasValue); + Assert.AreEqual(expectedVal, res.Value[ValCol]); + + CollectionAssert.Contains(resultSet, expectedVal); + } + + Assert.AreEqual(Count, resultSet.Count); + } + [Test] public async Task TestWithReceiverRecordView() { @@ -353,6 +443,35 @@ public class DataStreamerTests : IgniteTestsBase } } + [Test] + public async Task TestWithReceiverResultsRecordView() + { + IAsyncEnumerable<string> results = PocoView.StreamDataAsync<int, string, string>( + Enumerable.Range(0, Count).ToAsyncEnumerable(), + keySelector: x => GetPoco(x), + payloadSelector: x => $"{x}-value{x * 10}", + units: Array.Empty<DeploymentUnit>(), + receiverClassName: TestReceiverClassName, + receiverArgs: new object[] { Table.Name, "arg1", 22 }, + options: DataStreamerOptions.Default); + + var resultSet = await results.ToHashSetAsync(); + + for (int i = 0; i < Count; i++) + { + var res = await TupleView.GetAsync(null, GetTuple(i)); + + var expectedVal = $"value{i * 10}_arg1_22"; + + Assert.IsTrue(res.HasValue); + Assert.AreEqual(expectedVal, res.Value[ValCol]); + + CollectionAssert.Contains(resultSet, expectedVal); + } + + Assert.AreEqual(Count, resultSet.Count); + } + [Test] public async Task TestWithReceiverKeyValueBinaryView() { @@ -373,6 +492,34 @@ public class DataStreamerTests : IgniteTestsBase } } + [Test] + public async Task TestWithReceiverResultsKeyValueBinaryView() + { + IAsyncEnumerable<string> results = Table.KeyValueBinaryView.StreamDataAsync<int, string, string>( + Enumerable.Range(0, Count).ToAsyncEnumerable(), + keySelector: x => new KeyValuePair<IIgniteTuple, IIgniteTuple>(GetTuple(x), new IgniteTuple()), + payloadSelector: x => $"{x}-value{x * 10}", + units: Array.Empty<DeploymentUnit>(), + receiverClassName: TestReceiverClassName, + receiverArgs: new object[] { Table.Name, "arg1", 22 }); + + var resultSet = await results.ToHashSetAsync(); + + for (int i = 0; i < Count; i++) + { + var res = await TupleView.GetAsync(null, GetTuple(i)); + + var expectedVal = $"value{i * 10}_arg1_22"; + + Assert.IsTrue(res.HasValue); + Assert.AreEqual(expectedVal, res.Value[ValCol]); + + CollectionAssert.Contains(resultSet, expectedVal); + } + + Assert.AreEqual(Count, resultSet.Count); + } + [Test] public async Task TestWithReceiverKeyValueView() { @@ -393,6 +540,34 @@ public class DataStreamerTests : IgniteTestsBase } } + [Test] + public async Task TestWithReceiverResultsKeyValueView() + { + IAsyncEnumerable<string> results = Table.GetKeyValueView<long, Poco>().StreamDataAsync<int, string, string>( + Enumerable.Range(0, Count).ToAsyncEnumerable(), + keySelector: x => new KeyValuePair<long, Poco>(x, null!), + payloadSelector: x => $"{x}-value{x * 10}", + units: Array.Empty<DeploymentUnit>(), + receiverClassName: TestReceiverClassName, + receiverArgs: new object[] { Table.Name, "arg11", 55}); + + var resultSet = await results.ToHashSetAsync(); + + for (int i = 0; i < Count; i++) + { + var res = await TupleView.GetAsync(null, GetTuple(i)); + + var expectedVal = $"value{i * 10}_arg11_55"; + + Assert.IsTrue(res.HasValue); + Assert.AreEqual(expectedVal, res.Value[ValCol]); + + CollectionAssert.Contains(resultSet, expectedVal); + } + + Assert.AreEqual(Count, resultSet.Count); + } + [Test] public void TestUnknownReceiverClass() { @@ -422,6 +597,21 @@ public class DataStreamerTests : IgniteTestsBase Assert.AreEqual("Streamer receiver failed: Job execution failed: java.lang.ArithmeticException: Test exception: 1", ex.Message); } + [Test] + public void TestReceiverWithResultsException() + { + var ex = Assert.ThrowsAsync<IgniteException>(async () => + await PocoView.StreamDataAsync<int, string, string>( + Enumerable.Range(0, 1).ToAsyncEnumerable(), + keySelector: x => GetPoco(x), + payloadSelector: _ => string.Empty, + units: Array.Empty<DeploymentUnit>(), + receiverClassName: TestReceiverClassName, + receiverArgs: new object[] { "throw", "throw", 1 }).ToListAsync()); + + Assert.AreEqual("Streamer receiver failed: Job execution failed: java.lang.ArithmeticException: Test exception: 1", ex.Message); + } + [Test] public void TestReceiverSelectorException([Values(true, false)] bool keySelector) { @@ -509,6 +699,73 @@ public class DataStreamerTests : IgniteTestsBase ex.Message); } + [TestCaseSource(nameof(AllSupportedTypes))] + public async Task TestEchoReceiverAllDataTypes(object arg) + { + var res = await PocoView.StreamDataAsync<object, object, object>( + new object[] { 1 }.ToAsyncEnumerable(), + keySelector: x => new Poco(), + payloadSelector: x => x.ToString()!, + units: Array.Empty<DeploymentUnit>(), + receiverClassName: EchoArgsReceiverClassName, + receiverArgs: new[] { arg }).SingleAsync(); + + Assert.AreEqual(arg, res); + } + + [Test] + public async Task TestResultConsumerEarlyExit() + { + IAsyncEnumerable<string> results = PocoView.StreamDataAsync<int, string, string>( + Enumerable.Range(0, Count).ToAsyncEnumerable(), + keySelector: x => GetPoco(x), + payloadSelector: x => $"{x}-value{x * 10}", + units: Array.Empty<DeploymentUnit>(), + receiverClassName: TestReceiverClassName, + receiverArgs: new object[] { Table.Name, "arg1", 22 }, + options: DataStreamerOptions.Default with { PageSize = 1 }); + + // Read only part of the results. + var resultSet = await results.Take(3).ToListAsync(); + Assert.AreEqual(3, resultSet.Count); + + for (int i = 0; i < Count; i++) + { + var res = await TupleView.GetAsync(null, GetTuple(i)); + + var expectedVal = $"value{i * 10}_arg1_22"; + + Assert.IsTrue(res.HasValue, $"Key {i} not found"); + Assert.AreEqual(expectedVal, res.Value[ValCol]); + } + } + + [Test] + public async Task TestResultConsumerCancellation() + { + IAsyncEnumerable<string> results = PocoView.StreamDataAsync<int, string, string>( + Enumerable.Range(0, Count).ToAsyncEnumerable(), + keySelector: x => GetPoco(x), + payloadSelector: x => $"{x}-value{x * 10}", + units: Array.Empty<DeploymentUnit>(), + receiverClassName: TestReceiverClassName, + receiverArgs: new object[] { Table.Name, "arg1", 22 }, + options: DataStreamerOptions.Default with { PageSize = 1 }); + + var cts = new CancellationTokenSource(); + + await using var enumerator = results.GetAsyncEnumerator(cts.Token); + Assert.IsTrue(await enumerator.MoveNextAsync()); + + // Cancel the resulting enumerator before it's fully consumed. This stops the streamer. + cts.Cancel(); + Assert.ThrowsAsync<TaskCanceledException>(async () => await enumerator.MoveNextAsync()); + + // Only part of the data was streamed. + var streamedData = await TupleView.GetAllAsync(null, Enumerable.Range(0, Count).Select(x => GetTuple(x))); + Assert.Less(streamedData.Count(x => x.HasValue), Count / 2); + } + private static async IAsyncEnumerable<IIgniteTuple> GetFakeServerData(int count) { for (var i = 0; i < count; i++) diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleBuilder.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleBuilder.cs index 5b9d3c2aa8..e7ccd6c478 100644 --- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleBuilder.cs +++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleBuilder.cs @@ -919,6 +919,11 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple AppendNull(); // Value. break; + case bool b: + AppendTypeAndScale(ColumnType.Boolean); + AppendBool(b); + break; + case int i32: AppendTypeAndScale(ColumnType.Int32); AppendInt(i32); @@ -995,6 +1000,16 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple AppendTimestamp(instant, timestampPrecision); break; + case Period period: + AppendTypeAndScale(ColumnType.Period); + AppendPeriod(period); + break; + + case Duration duration: + AppendTypeAndScale(ColumnType.Duration); + AppendDuration(duration); + break; + case BitArray bitArray: AppendTypeAndScale(ColumnType.Bitmask); AppendBitmask(bitArray); diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleReader.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleReader.cs index bc6993187b..1931ea9115 100644 --- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleReader.cs +++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleReader.cs @@ -18,6 +18,7 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple { using System; + using System.Buffers; using System.Buffers.Binary; using System.Collections; using System.Diagnostics; @@ -507,6 +508,42 @@ namespace Apache.Ignite.Internal.Proto.BinaryTuple return GetObject(index + 2, type, scale); } + /// <summary> + /// Gets an object collection with the specified element type. + /// Opposite of <see cref="BinaryTupleBuilder.AppendObjectCollectionWithType{T}"/>. + /// </summary> + /// <param name="index">Start index.</param> + /// <typeparam name="T">Element type.</typeparam> + /// <returns>Pooled array with items and actual item count.</returns> + public (T[] Items, int Count) GetObjectCollectionWithType<T>(int index = 0) + { + int typeId = GetInt(index++); + int count = GetInt(index++); + + if (count == 0) + { + return (Array.Empty<T>(), 0); + } + + var items = ArrayPool<T>.Shared.Rent(count); + var type = (ColumnType)typeId; + + try + { + for (int i = 0; i < count; i++) + { + items[i] = (T)GetObject(index + i, type)!; + } + + return (items, count); + } + catch (Exception) + { + ArrayPool<T>.Shared.Return(items); + throw; + } + } + private static LocalDate ReadDate(ReadOnlySpan<byte> span) { // Read int32 from 3 bytes, preserving sign. diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamerWithReceiver.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamerWithReceiver.cs index bab2437a96..3dbc64269d 100644 --- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamerWithReceiver.cs +++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/DataStreamerWithReceiver.cs @@ -23,9 +23,9 @@ using System.Collections.Generic; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Linq; -using System.Runtime.CompilerServices; using System.Runtime.InteropServices; using System.Threading; +using System.Threading.Channels; using System.Threading.Tasks; using Buffers; using Common; @@ -60,7 +60,7 @@ internal static class DataStreamerWithReceiver /// <param name="payloadSelector">Payload func.</param> /// <param name="keyWriter">Key writer.</param> /// <param name="options">Options.</param> - /// <param name="expectResults">Whether to expect results from the receiver.</param> + /// <param name="resultChannel">Channel for results from the receiver. Null when results are not expected.</param> /// <param name="units">Deployment units. Can be empty.</param> /// <param name="receiverClassName">Java class name of the streamer receiver to execute on the server.</param> /// <param name="receiverArgs">Receiver args.</param> @@ -70,30 +70,24 @@ internal static class DataStreamerWithReceiver /// <typeparam name="TPayload">Payload type.</typeparam> /// <typeparam name="TResult">Result type.</typeparam> /// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns> - internal static async IAsyncEnumerable<TResult> StreamDataAsync<TSource, TKey, TPayload, TResult>( + internal static async Task StreamDataAsync<TSource, TKey, TPayload, TResult>( IAsyncEnumerable<TSource> data, Table table, Func<TSource, TKey> keySelector, Func<TSource, TPayload> payloadSelector, IRecordSerializerHandler<TKey> keyWriter, DataStreamerOptions options, - bool expectResults, + Channel<TResult>? resultChannel, IEnumerable<DeploymentUnit> units, string receiverClassName, ICollection<object>? receiverArgs, - [EnumeratorCancellation] CancellationToken cancellationToken) + CancellationToken cancellationToken) where TKey : notnull where TPayload : notnull { IgniteArgumentCheck.NotNull(data); DataStreamer.ValidateOptions(options); - if (expectResults) - { - // TODO IGNITE-22356 Support result retrieval. - throw new NotSupportedException("Result retrieval is not yet supported."); - } - // ConcurrentDictionary is not necessary because we consume the source sequentially. // However, locking for batches is required due to auto-flush background task. var batches = new Dictionary<int, Batch<TPayload>>(); @@ -148,8 +142,7 @@ internal static class DataStreamerWithReceiver } } - // TODO IGNITE-22356 Support result retrieval. - yield break; + return; Batch<TPayload> Add(TSource item) { @@ -246,6 +239,7 @@ internal static class DataStreamerWithReceiver await Task.Yield(); var buf = ProtoCommon.GetMessageWriter(); + TResult[]? results = null; try { @@ -256,13 +250,33 @@ internal static class DataStreamerWithReceiver // Wait for the previous batch for this node to preserve item order. await oldTask.ConfigureAwait(false); - await SendBatchAsync(table, buf, count, preferredNode, retryPolicy).ConfigureAwait(false); + (results, int resultsCount) = await SendBatchAsync<TResult>( + table, buf, count, preferredNode, retryPolicy, expectResults: resultChannel != null).ConfigureAwait(false); + + if (results != null && resultChannel != null) + { + for (var i = 0; i < resultsCount; i++) + { + TResult result = results[i]; + await resultChannel.Writer.WriteAsync(result, cancellationToken).ConfigureAwait(false); + } + } + } + catch (ChannelClosedException) + { + // Consumer does not want more results, stop returning them, but keep streaming. + resultChannel = null; } finally { buf.Dispose(); GetPool<TPayload>().Return(items); + if (results != null) + { + GetPool<TResult>().Return(results); + } + Metrics.StreamerItemsQueuedDecrement(count); Metrics.StreamerBatchesActiveDecrement(); } @@ -311,6 +325,7 @@ internal static class DataStreamerWithReceiver Compute.WriteUnits(units0, buf); + var expectResults = resultChannel != null; w.Write(expectResults); WriteReceiverPayload(ref w, receiverClassName, receiverArgs ?? Array.Empty<object>(), items); } @@ -339,12 +354,13 @@ internal static class DataStreamerWithReceiver w.Write(builder.Build().Span); } - private static async Task SendBatchAsync( + private static async Task<(T[]? ResultsPooledArray, int ResultsCount)> SendBatchAsync<T>( Table table, PooledArrayBuffer buf, int count, PreferredNode preferredNode, - IRetryPolicy retryPolicy) + IRetryPolicy retryPolicy, + bool expectResults) { var (resBuf, socket) = await table.Socket.DoOutInOpAndGetSocketAsync( ClientOp.StreamerWithReceiverBatchSend, @@ -354,10 +370,33 @@ internal static class DataStreamerWithReceiver retryPolicy) .ConfigureAwait(false); - resBuf.Dispose(); + using (resBuf) + { + Metrics.StreamerBatchesSent.Add(1, socket.MetricsContext.Tags); + Metrics.StreamerItemsSent.Add(count, socket.MetricsContext.Tags); + + return expectResults + ? Read(resBuf.GetReader()) + : (null, 0); + } + + static (T[]? ResultsPooledArray, int ResultsCount) Read(MsgPackReader reader) + { + if (reader.TryReadNil()) + { + return (null, 0); + } + + var numElements = reader.ReadInt32(); + if (numElements == 0) + { + return (null, 0); + } + + var tuple = new BinaryTupleReader(reader.ReadBinary(), numElements); - Metrics.StreamerBatchesSent.Add(1, socket.MetricsContext.Tags); - Metrics.StreamerItemsSent.Add(count, socket.MetricsContext.Tags); + return tuple.GetObjectCollectionWithType<T>(); + } } private static ArrayPool<T> GetPool<T>() => ArrayPool<T>.Shared; diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs index f731a65a43..f96973864e 100644 --- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs +++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs @@ -19,10 +19,11 @@ namespace Apache.Ignite.Internal.Table { using System; using System.Collections.Generic; - using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Linq; + using System.Runtime.CompilerServices; using System.Threading; + using System.Threading.Channels; using System.Threading.Tasks; using Buffers; using Common; @@ -307,7 +308,7 @@ namespace Apache.Ignite.Internal.Table cancellationToken).ConfigureAwait(false); /// <inheritdoc/> - public IAsyncEnumerable<TResult> StreamDataAsync<TSource, TPayload, TResult>( + public async IAsyncEnumerable<TResult> StreamDataAsync<TSource, TPayload, TResult>( IAsyncEnumerable<TSource> data, Func<TSource, T> keySelector, Func<TSource, TPayload> payloadSelector, @@ -315,20 +316,83 @@ namespace Apache.Ignite.Internal.Table string receiverClassName, ICollection<object>? receiverArgs, DataStreamerOptions? options, - CancellationToken cancellationToken = default) - where TPayload : notnull => - DataStreamerWithReceiver.StreamDataAsync<TSource, T, TPayload, TResult>( - data, - _table, - keySelector, - payloadSelector, - keyWriter: _ser.Handler, - options ?? DataStreamerOptions.Default, - expectResults: true, - units, - receiverClassName, - receiverArgs, - cancellationToken); + [EnumeratorCancellation] CancellationToken cancellationToken = default) + where TPayload : notnull + { + options ??= DataStreamerOptions.Default; + + // Validate before using for channel capacity. + DataStreamer.ValidateOptions(options); + + // Double the page size to read the next page while the previous one is being consumed. + var resultChannelCapacity = options.PageSize * 2; + + Channel<TResult> resultChannel = Channel.CreateBounded<TResult>(new BoundedChannelOptions(resultChannelCapacity) + { + // Backpressure - streamer will wait for results to be consumed before streaming more. + FullMode = BoundedChannelFullMode.Wait, + + // One reader: resulting IAsyncEnumerable. + SingleReader = true, + + // Many writers: batches may complete in parallel. + SingleWriter = false + }); + + // Stream in background. + var streamTask = Stream(); + + // Result async enumerable is returned immediately. It will be completed when the streaming completes. + var reader = resultChannel.Reader; + + try + { + while (await reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false)) + { + while (reader.TryRead(out var item)) + { + yield return item; + } + } + } + finally + { + // Consumer has stopped reading, complete the channel. + resultChannel.Writer.TryComplete(); + + // Wait for the streamer to complete even if the result consumer has stopped reading. + await streamTask.ConfigureAwait(false); + } + + [SuppressMessage( + "Design", + "CA1031:Do not catch general exception types", + Justification = "All exceptions should be propagated to the result channel.")] + async Task Stream() + { + try + { + await DataStreamerWithReceiver.StreamDataAsync( + data, + _table, + keySelector, + payloadSelector, + keyWriter: _ser.Handler, + options, + resultChannel, + units, + receiverClassName, + receiverArgs, + cancellationToken).ConfigureAwait(false); + + resultChannel.Writer.Complete(); + } + catch (Exception e) + { + resultChannel.Writer.TryComplete(e); + } + } + } /// <inheritdoc/> public async Task StreamDataAsync<TSource, TPayload>( @@ -342,24 +406,18 @@ namespace Apache.Ignite.Internal.Table CancellationToken cancellationToken = default) where TPayload : notnull { - IAsyncEnumerable<object> results = DataStreamerWithReceiver.StreamDataAsync<TSource, T, TPayload, object>( + await DataStreamerWithReceiver.StreamDataAsync<TSource, T, TPayload, object>( data, _table, keySelector, payloadSelector, keyWriter: _ser.Handler, options ?? DataStreamerOptions.Default, - expectResults: false, + resultChannel: null, units, receiverClassName, receiverArgs, - cancellationToken); - - // Await streaming completion. - await foreach (var unused in results) - { - Debug.Fail("Got results with expectResults=false: " + unused); - } + cancellationToken).ConfigureAwait(false); } /// <inheritdoc/> diff --git a/modules/platforms/dotnet/Apache.Ignite/Table/IDataStreamerTarget.cs b/modules/platforms/dotnet/Apache.Ignite/Table/IDataStreamerTarget.cs index 31d3aae978..c5dd1aef5d 100644 --- a/modules/platforms/dotnet/Apache.Ignite/Table/IDataStreamerTarget.cs +++ b/modules/platforms/dotnet/Apache.Ignite/Table/IDataStreamerTarget.cs @@ -79,7 +79,12 @@ public interface IDataStreamerTarget<T> /// <param name="receiverArgs">Receiver args.</param> /// <param name="options">Streamer options.</param> /// <param name="cancellationToken">Cancellation token.</param> - /// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns> + /// <returns> + /// A <see cref="IAsyncEnumerable{T}"/> with the results from the receiver. + /// <para /> + /// The resulting async enumerator applies back-pressure to the data source, so it should be either fully consumed + /// or disposed to complete the streaming. Disposing the enumerator before it is fully consumed will ignore the remaining results. + /// </returns> /// <typeparam name="TSource">Source item type.</typeparam> /// <typeparam name="TPayload">Payload type.</typeparam> /// <typeparam name="TResult">Result type.</typeparam> diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java index d66ba86956..2539d24935 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java @@ -790,18 +790,22 @@ public class PlatformTestNodeRunner { Table table = ctx.ignite().tables().table(tableName); RecordView<Tuple> recordView = table.recordView(); + List<String> res = new ArrayList<>(); for (String s : page) { String[] parts = s.split("-", 2); + String val = parts[1] + "_" + arg1 + "_" + arg2; Tuple rec = Tuple.create() .set("key", Long.parseLong(parts[0])) - .set("val", parts[1] + "_" + arg1 + "_" + arg2); + .set("val", val); + + res.add(val); recordView.upsert(null, rec); } - return null; + return CompletableFuture.completedFuture(res); } } @@ -834,4 +838,20 @@ public class PlatformTestNodeRunner { return null; } } + + @SuppressWarnings("unused") // Used by platform tests. + private static class EchoArgsReceiver implements DataStreamerReceiver<Object, Object> { + @Override + public CompletableFuture<List<Object>> receive(List<Object> page, DataStreamerReceiverContext ctx, Object... args) { + return CompletableFuture.completedFuture(List.of(args)); + } + } + + @SuppressWarnings("unused") // Used by platform tests. + private static class EchoReceiver implements DataStreamerReceiver<Object, Object> { + @Override + public CompletableFuture<List<Object>> receive(List<Object> page, DataStreamerReceiverContext ctx, Object... args) { + return CompletableFuture.completedFuture(page); + } + } }