birschick-bq commented on code in PR #3397:
URL: https://github.com/apache/arrow-adbc/pull/3397#discussion_r2338069986


##########
csharp/src/Telemetry/Traces/Exporters/FileExporter/FileExporter.cs:
##########
@@ -120,28 +120,54 @@ public override ExportResult Export(in Batch<Activity> 
batch)
             foreach (Activity activity in batch)
             {
                 if (activity == null) continue;
-                _activityQueue.Enqueue(activity);
+                _ = _channel.Writer.TryWrite(activity);
             }
             return ExportResult.Success;
         }
 
+        protected override bool OnForceFlush(int timeoutMilliseconds)
+        {
+            CancellationTokenSource cts = new CancellationTokenSource();
+            cts.CancelAfter(TimeSpan.FromMilliseconds(timeoutMilliseconds));
+            Task.Delay(100).ConfigureAwait(false).GetAwaiter().GetResult();
+            while (!cts.IsCancellationRequested && 
!_channel.Reader.Completion.IsCompleted && _channel.Reader.Count > 0)
+            {
+                Task.Delay(100).ConfigureAwait(false).GetAwaiter().GetResult();
+            }
+            return true;
+        }
+
         private static async Task ProcessActivitiesAsync(FileExporter 
fileExporter, CancellationToken cancellationToken)
         {
             try
             {
-                TimeSpan delay = TimeSpan.FromMilliseconds(100);
-                // Polls for and then writes any activities in the queue
-                while (!cancellationToken.IsCancellationRequested)
+                using MemoryStream stream = new();
+                await foreach (Activity activity in 
fileExporter._channel.Reader.ReadAllAsync(cancellationToken))
                 {
-                    await Task.Delay(delay, cancellationToken);
-                    await 
fileExporter._tracingFile.WriteLinesAsync(GetActivitiesAsync(fileExporter._activityQueue),
 cancellationToken);
+                    if (cancellationToken.IsCancellationRequested) break;
+
+                    stream.SetLength(0);
+                    SerializableActivity serializableActivity = new(activity);
+                    await JsonSerializer.SerializeAsync(
+                        stream,
+                        serializableActivity);
+                    stream.Write(s_newLine, 0, s_newLine.Length);
+                    stream.Position = 0;
+
+                    await fileExporter._tracingFile.WriteLineAsync(stream, 
cancellationToken);
                 }
             }
+            catch (OperationCanceledException ex)
+            {
+                // Expected when cancellationToken is cancelled.
+                Console.WriteLine(ex);

Review Comment:
   Will use `Trace.TraceError`, instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to