This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-http.git


The following commit(s) were added to refs/heads/main by this push:
     new 5b5a91cde fix: recover HTTP/2 handler errors as 500 (#1041)
5b5a91cde is described below

commit 5b5a91cde416b3a75d97f352fb3a96e0de470b44
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sun May 24 20:49:31 2026 +0800

    fix: recover HTTP/2 handler errors as 500 (#1041)
    
    Motivation:
    Port upstream akka-http fix 02bb9872e1d391f53e21b663336e5052085a0b8c, which 
is now Apache licensed. HTTP/2 user handler failures should be converted into 
500 responses instead of leaking through the connection handling path.
    
    Modification:
    Wrap HTTP/2 handlers with NonFatal error recovery for failed futures and 
synchronous throws. Keep HTTP/1 fallback on the original handler so existing 
HTTP/1 error classification is preserved, and reject unknown HTTP/2 
pseudo-headers during request parsing so protocol errors are not converted to 
500 responses.
    
    Result:
    HTTP/2 handler failures now return InternalServerError while malformed 
pseudo-headers still produce HTTP/2 protocol errors. Added regression coverage 
for failed futures, synchronous throws, and verified h2spec pseudo-header 
behavior.
    
    References:
    Upstream: 
https://github.com/akka/akka-http/commit/02bb9872e1d391f53e21b663336e5052085a0b8c
---
 .../pekko/http/impl/engine/http2/Http2.scala       | 30 +++++++++++++++++++---
 .../http/impl/engine/http2/RequestParsing.scala    |  3 +++
 .../impl/engine/http2/Http2ClientServerSpec.scala  | 30 ++++++++++++++++++++--
 3 files changed, 57 insertions(+), 6 deletions(-)

diff --git 
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2.scala 
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2.scala
index 2cdb6a42c..eaac91b91 100644
--- 
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2.scala
+++ 
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2.scala
@@ -90,11 +90,15 @@ private[http] final class Http2Ext(implicit val system: 
ActorSystem)
       else if (connectionContext.isSecure) settings.defaultHttpsPort
       else settings.defaultHttpPort
 
+    val handlerWithErrorHandling = withErrorHandling(log, handler)
+
     val http1: HttpImplementation =
-      
Flow[HttpRequest].mapAsync(settings.pipeliningLimit)(handleUpgradeRequests(handler,
 settings, log))
+      
Flow[HttpRequest].mapAsync(settings.pipeliningLimit)(handleUpgradeRequests(handler,
 handlerWithErrorHandling,
+        settings, log))
         .joinMat(GracefulTerminatorStage(system, 
settings).atop(http.serverLayer(settings, log = log)))(Keep.right)
     val http2: HttpImplementation =
-      
Http2Blueprint.handleWithStreamIdHeader(settings.http2Settings.maxConcurrentStreams)(handler)(system.dispatcher)
+      
Http2Blueprint.handleWithStreamIdHeader(settings.http2Settings.maxConcurrentStreams)(handlerWithErrorHandling)(
+        system.dispatcher)
         .joinMat(Http2Blueprint.serverStackTls(settings, log, telemetry, 
Http().dateHeaderRendering))(Keep.right)
 
     val masterTerminator = new MasterServerTerminator(log)
@@ -138,6 +142,23 @@ private[http] final class Http2Ext(implicit val system: 
ActorSystem)
       }.to(Sink.ignore).run()
   }
 
+  private def withErrorHandling(
+      log: LoggingAdapter,
+      handler: HttpRequest => Future[HttpResponse]): HttpRequest => 
Future[HttpResponse] = { request =>
+    try {
+      handler(request).recover {
+        case NonFatal(ex) => handleHandlerError(log, ex)
+      }(ExecutionContext.parasitic)
+    } catch {
+      case NonFatal(ex) => Future.successful(handleHandlerError(log, ex))
+    }
+  }
+
+  private def handleHandlerError(log: LoggingAdapter, ex: Throwable): 
HttpResponse = {
+    log.error(ex, "Internal server error, sending 500 response")
+    HttpResponse(StatusCodes.InternalServerError)
+  }
+
   private def prepareServerAttributes(settings: ServerSettings, incoming: 
Tcp.IncomingConnection) = {
     val attrs = Http.prepareAttributes(settings, incoming)
     if (telemetry == NoOpTelemetry) attrs
@@ -149,6 +170,7 @@ private[http] final class Http2Ext(implicit val system: 
ActorSystem)
 
   private def handleUpgradeRequests(
       handler: HttpRequest => Future[HttpResponse],
+      handlerWithErrorHandling: HttpRequest => Future[HttpResponse],
       settings: ServerSettings,
       log: LoggingAdapter): HttpRequest => Future[HttpResponse] = { req =>
     req.header[Upgrade] match {
@@ -172,8 +194,8 @@ private[http] final class Http2Ext(implicit val system: 
ActorSystem)
               Flow[HttpRequest]
                 .watchTermination(Keep.right)
                 .prepend(injectedRequest)
-                
.via(Http2Blueprint.handleWithStreamIdHeader(settings.http2Settings.maxConcurrentStreams)(handler)(
-                  system.dispatcher))
+                
.via(Http2Blueprint.handleWithStreamIdHeader(settings.http2Settings.maxConcurrentStreams)(
+                  handlerWithErrorHandling)(system.dispatcher))
                 // the settings from the header are injected into the 
blueprint as initial demuxer settings
                 .joinMat(Http2Blueprint.serverStack(settings, log, 
settingsFromHeader, true, telemetry,
                   Http().dateHeaderRendering))(Keep.left))
diff --git 
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/RequestParsing.scala
 
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/RequestParsing.scala
index 8b3f148b7..99e0c20be 100644
--- 
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/RequestParsing.scala
+++ 
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/RequestParsing.scala
@@ -160,6 +160,9 @@ private[http2] object RequestParsing {
             case ":status" =>
               protocolError("Pseudo-header ':status' is for responses only; it 
cannot appear in a request")
 
+            case name if name.startsWith(":") =>
+              protocolError(s"Unexpected pseudo-header '$name' in request")
+
             case "content-length" =>
               if (contentLength == -1) {
                 val contentLengthValue = ContentLength.get(value).toLong
diff --git 
a/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ClientServerSpec.scala
 
b/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ClientServerSpec.scala
index 96ddec9f6..6f718ec56 100644
--- 
a/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ClientServerSpec.scala
+++ 
b/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ClientServerSpec.scala
@@ -30,7 +30,6 @@ import pekko.http.scaladsl.model.{
   HttpHeader,
   HttpMethod,
   HttpMethods,
-  HttpProtocols,
   HttpRequest,
   HttpResponse,
   RequestResponseAssociation,
@@ -45,6 +44,8 @@ import pekko.http.scaladsl.unmarshalling.Unmarshal
 import pekko.stream.StreamTcpException
 import pekko.stream.scaladsl.{ Sink, Source }
 import pekko.stream.testkit.{ TestPublisher, TestSubscriber }
+import pekko.stream.testkit.Utils.TE
+import pekko.testkit.EventFilter
 import pekko.testkit.TestProbe
 import pekko.util.ByteString
 
@@ -144,6 +145,31 @@ class Http2ClientServerSpec extends 
PekkoSpecWithMaterializer(
       val response = expectClientResponse()
       response.status should be(StatusCodes.BadRequest)
     }
+
+    "return internal server error when handler future fails" in new TestSetup {
+      sendClientRequest()
+      val serverRequest = expectServerRequest()
+
+      EventFilter[TE](message = "boom", occurrences = 1).intercept {
+        serverRequest.promise.failure(TE("boom"))
+        val response = expectClientResponse()
+        response.status should be(StatusCodes.InternalServerError)
+      }
+
+      sendClientRequest()
+      val nextServerRequest = expectServerRequest()
+      nextServerRequest.sendResponse(HttpResponse())
+      expectClientResponse().status should be(StatusCodes.OK)
+    }
+
+    "return internal server error when handler throws synchronously" in new 
TestSetup {
+      override def handler: HttpRequest => Future[HttpResponse] = _ => throw 
TE("boom-sync")
+
+      EventFilter[TE](message = "boom-sync", occurrences = 1).intercept {
+        sendClientRequest()
+        expectClientResponse().status should 
be(StatusCodes.InternalServerError)
+      }
+    }
   }
 
   case class ServerRequest(request: HttpRequest, promise: 
Promise[HttpResponse]) {
@@ -169,7 +195,7 @@ class Http2ClientServerSpec extends 
PekkoSpecWithMaterializer(
     def serverSettings: ServerSettings = ServerSettings(system)
     def clientSettings: ClientConnectionSettings = 
ClientConnectionSettings(system)
     private lazy val serverRequestProbe = TestProbe()
-    private lazy val handler: HttpRequest => Future[HttpResponse] = { req =>
+    def handler: HttpRequest => Future[HttpResponse] = { req =>
       val p = Promise[HttpResponse]()
       serverRequestProbe.ref ! ServerRequest(req, p)
       p.future


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to