This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new d548313a4b Add fine-grained stream error logging control (#2805)
d548313a4b is described below

commit d548313a4b31196def8084fe9d88aa191a88dcfb
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Mon Mar 30 20:16:36 2026 +0800

    Add fine-grained stream error logging control (#2805)
    
    Add configurable stream stage error log level via
    'pekko.stream.materializer.stage-errors-default-log-level' config key.
    Supports 'error' (default), 'warning', 'info', 'debug', or 'off'.
    
    When a stage-specific LogLevels attribute is present, it takes
    precedence. Otherwise the system-wide default is used, enabling
    operators to reduce noise from expected stream errors without
    per-stage configuration.
    
    Changes:
    - Add stage-errors-default-log-level to reference.conf
    - Add LogLevels.defaultErrorLevel and LogLevels.fromString helpers
    - Update GraphInterpreter.reportStageError to use per-level logging
    - Update RestartFlow.loggingEnabled to respect system-wide setting
    - Change NoMaterializer from object to case class accepting ActorSystem
      (required because GraphInterpreter now accesses materializer.system)
    
    Upstream: akka/akka-core@519d33d897
    Cherry-picked from akka/akka-core v2.8.0, which is now Apache licensed.
    
    Co-authored-by: Copilot <[email protected]>
---
 .../impl/fusing/GraphInterpreterSpecKit.scala      |  7 ++-----
 stream/src/main/resources/reference.conf           |  5 +++++
 .../scala/org/apache/pekko/stream/Attributes.scala | 19 +++++++++++++++++++
 .../stream/impl/fusing/GraphInterpreter.scala      | 22 +++++++++++++++++-----
 .../apache/pekko/stream/scaladsl/RestartFlow.scala |  4 +++-
 5 files changed, 46 insertions(+), 11 deletions(-)

diff --git 
a/stream-testkit/src/test/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreterSpecKit.scala
 
b/stream-testkit/src/test/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreterSpecKit.scala
index 46bf05c5ac..bfe8d92599 100644
--- 
a/stream-testkit/src/test/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreterSpecKit.scala
+++ 
b/stream-testkit/src/test/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreterSpecKit.scala
@@ -40,7 +40,7 @@ import pekko.stream.testkit.Utils.TE
  * INTERNAL API
  */
 @InternalApi
-private[pekko] object NoMaterializer extends Materializer {
+private[pekko] case class NoMaterializer(override val system: ActorSystem) 
extends Materializer {
   override def withNamePrefix(name: String): Materializer =
     throw new UnsupportedOperationException("NoMaterializer cannot be named")
   override def materialize[Mat](runnable: Graph[ClosedShape, Mat]): Mat =
@@ -70,9 +70,6 @@ private[pekko] object NoMaterializer extends Materializer {
 
   override def isShutdown: Boolean = throw new 
UnsupportedOperationException("NoMaterializer cannot shutdown")
 
-  override def system: ActorSystem =
-    throw new UnsupportedOperationException("NoMaterializer does not have an 
actorsystem")
-
   override private[pekko] def logger = throw new 
UnsupportedOperationException("NoMaterializer does not have a logger")
 
   override private[pekko] def supervisor =
@@ -336,7 +333,7 @@ trait GraphInterpreterSpecKit extends StreamSpec {
       }
 
       _interpreter = new GraphInterpreter(
-        NoMaterializer,
+        NoMaterializer(system),
         logger,
         logics,
         connections,
diff --git a/stream/src/main/resources/reference.conf 
b/stream/src/main/resources/reference.conf
index e2ba88232c..bba71993ec 100644
--- a/stream/src/main/resources/reference.conf
+++ b/stream/src/main/resources/reference.conf
@@ -54,6 +54,11 @@ pekko {
       # Enable additional troubleshooting logging at DEBUG log level
       debug-logging = off
 
+      # Log any stream stage error at the specified log level: "error", 
"warning", "info", "debug" or "off".
+      # If there is a `pekko.stream.Attributes.LogLevels` attribute defined 
for a specific stream this value is ignored
+      # and the `onFailure` value of the attribute is applied instead.
+      stage-errors-default-log-level = error
+
       # Maximum number of elements emitted in batch if downstream signals 
large demand
       output-burst-limit = 1000
 
diff --git a/stream/src/main/scala/org/apache/pekko/stream/Attributes.scala 
b/stream/src/main/scala/org/apache/pekko/stream/Attributes.scala
index 838eedc52b..4f6d346c31 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/Attributes.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/Attributes.scala
@@ -26,6 +26,7 @@ import scala.reflect.{ classTag, ClassTag }
 import scala.util.control.NonFatal
 
 import org.apache.pekko
+import pekko.actor.ActorSystem
 import pekko.annotation.ApiMayChange
 import pekko.annotation.DoNotInherit
 import pekko.annotation.InternalApi
@@ -33,6 +34,7 @@ import pekko.event.Logging
 import pekko.japi.function
 import pekko.stream.impl.TraversalBuilder
 import pekko.util.ByteString
+import pekko.util.Helpers
 import pekko.util.LineNumbers
 
 /**
@@ -720,6 +722,23 @@ object Attributes {
 
     /** Use to enable logging at DEBUG level for certain operations when 
configuring [[Attributes#logLevels]] */
     final val Debug: Logging.LogLevel = Logging.DebugLevel
+
+    /** INTERNAL API */
+    @InternalApi
+    private[pekko] def defaultErrorLevel(system: ActorSystem): 
Logging.LogLevel =
+      
fromString(system.settings.config.getString("pekko.stream.materializer.stage-errors-default-log-level"))
+
+    /** INTERNAL API */
+    @InternalApi
+    private[pekko] def fromString(str: String): Logging.LogLevel = {
+      Helpers.toRootLowerCase(str) match {
+        case "off"     => Off
+        case "error"   => Error
+        case "warning" => Warning
+        case "info"    => Info
+        case "debug"   => Debug
+      }
+    }
   }
 
   /** Java API: Use to disable logging on certain operations when configuring 
[[Attributes#createLogLevels]] */
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala
 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala
index 43a05d946b..df3a36a39e 100644
--- 
a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala
+++ 
b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala
@@ -22,6 +22,7 @@ import org.apache.pekko
 import pekko.Done
 import pekko.actor.ActorRef
 import pekko.annotation.{ InternalApi, InternalStableApi }
+import pekko.event.Logging
 import pekko.event.LoggingAdapter
 import pekko.stream._
 import pekko.stream.Attributes.LogLevels
@@ -244,6 +245,8 @@ import pekko.stream.stage._
   private[this] val finalizedMark = Array.fill(logics.length)(false)
 
   private[this] var _subFusingMaterializer: Materializer = _
+  private[this] lazy val defaultErrorReportingLogLevel = 
LogLevels.defaultErrorLevel(materializer.system)
+
   def subFusingMaterializer: Materializer = _subFusingMaterializer
 
   // An event queue implemented as a circular buffer
@@ -378,12 +381,21 @@ import pekko.stream.stage._
         def reportStageError(e: Throwable): Unit = {
           if (activeStage eq null) throw e
           else {
-            val loggingEnabled = activeStage.attributes.get[LogLevels] match {
-              case Some(levels) => levels.onFailure != LogLevels.Off
-              case None         => true
+            val logAt: Logging.LogLevel = 
activeStage.attributes.get[LogLevels] match {
+              case Some(levels) => levels.onFailure
+              case None         => defaultErrorReportingLogLevel
+            }
+            logAt match {
+              case Logging.ErrorLevel =>
+                log.error(e, "Error in stage [{}]: {}", activeStage.toString, 
e.getMessage)
+              case Logging.WarningLevel =>
+                log.warning(e, "Error in stage [{}]: {}", 
activeStage.toString, e.getMessage)
+              case Logging.InfoLevel =>
+                log.info("Error in stage [{}]: {}", activeStage.toString, 
e.getMessage)
+              case Logging.DebugLevel =>
+                log.debug("Error in stage [{}]: {}", activeStage.toString, 
e.getMessage)
+              case _ => // Off, nop
             }
-            if (loggingEnabled)
-              log.error(e, "Error in stage [{}]: {}", activeStage.toString, 
e.getMessage)
             activeStage.failStage(e)
 
             // Abort chasing
diff --git 
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/RestartFlow.scala 
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/RestartFlow.scala
index 8a90aa3212..65f2ff9a4c 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/RestartFlow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/RestartFlow.scala
@@ -167,7 +167,9 @@ private abstract class RestartWithBackoffLogic[S <: Shape](
 
   private def loggingEnabled = inheritedAttributes.get[LogLevels] match {
     case Some(levels) => levels.onFailure != LogLevels.Off
-    case None         => true
+    case None         =>
+      // Allows for system wide disable at least
+      LogLevels.defaultErrorLevel(materializer.system) != LogLevels.Off
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to