This is an automated email from the ASF dual-hosted git repository.
He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-http.git
The following commit(s) were added to refs/heads/main by this push:
new 5b5a91cde fix: recover HTTP/2 handler errors as 500 (#1041)
5b5a91cde is described below
commit 5b5a91cde416b3a75d97f352fb3a96e0de470b44
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sun May 24 20:49:31 2026 +0800
fix: recover HTTP/2 handler errors as 500 (#1041)
Motivation:
Port upstream akka-http fix 02bb9872e1d391f53e21b663336e5052085a0b8c, which
is now Apache licensed. HTTP/2 user handler failures should be converted into
500 responses instead of leaking through the connection handling path.
Modification:
Wrap HTTP/2 handlers with NonFatal error recovery for failed futures and
synchronous throws. Keep HTTP/1 fallback on the original handler so existing
HTTP/1 error classification is preserved, and reject unknown HTTP/2
pseudo-headers during request parsing so protocol errors are not converted to
500 responses.
Result:
HTTP/2 handler failures now return InternalServerError while malformed
pseudo-headers still produce HTTP/2 protocol errors. Added regression coverage
for failed futures, synchronous throws, and verified h2spec pseudo-header
behavior.
References:
Upstream:
https://github.com/akka/akka-http/commit/02bb9872e1d391f53e21b663336e5052085a0b8c
---
.../pekko/http/impl/engine/http2/Http2.scala | 30 +++++++++++++++++++---
.../http/impl/engine/http2/RequestParsing.scala | 3 +++
.../impl/engine/http2/Http2ClientServerSpec.scala | 30 ++++++++++++++++++++--
3 files changed, 57 insertions(+), 6 deletions(-)
diff --git
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2.scala
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2.scala
index 2cdb6a42c..eaac91b91 100644
---
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2.scala
+++
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2.scala
@@ -90,11 +90,15 @@ private[http] final class Http2Ext(implicit val system:
ActorSystem)
else if (connectionContext.isSecure) settings.defaultHttpsPort
else settings.defaultHttpPort
+ val handlerWithErrorHandling = withErrorHandling(log, handler)
+
val http1: HttpImplementation =
-
Flow[HttpRequest].mapAsync(settings.pipeliningLimit)(handleUpgradeRequests(handler,
settings, log))
+
Flow[HttpRequest].mapAsync(settings.pipeliningLimit)(handleUpgradeRequests(handler,
handlerWithErrorHandling,
+ settings, log))
.joinMat(GracefulTerminatorStage(system,
settings).atop(http.serverLayer(settings, log = log)))(Keep.right)
val http2: HttpImplementation =
-
Http2Blueprint.handleWithStreamIdHeader(settings.http2Settings.maxConcurrentStreams)(handler)(system.dispatcher)
+
Http2Blueprint.handleWithStreamIdHeader(settings.http2Settings.maxConcurrentStreams)(handlerWithErrorHandling)(
+ system.dispatcher)
.joinMat(Http2Blueprint.serverStackTls(settings, log, telemetry,
Http().dateHeaderRendering))(Keep.right)
val masterTerminator = new MasterServerTerminator(log)
@@ -138,6 +142,23 @@ private[http] final class Http2Ext(implicit val system:
ActorSystem)
}.to(Sink.ignore).run()
}
+ private def withErrorHandling(
+ log: LoggingAdapter,
+ handler: HttpRequest => Future[HttpResponse]): HttpRequest =>
Future[HttpResponse] = { request =>
+ try {
+ handler(request).recover {
+ case NonFatal(ex) => handleHandlerError(log, ex)
+ }(ExecutionContext.parasitic)
+ } catch {
+ case NonFatal(ex) => Future.successful(handleHandlerError(log, ex))
+ }
+ }
+
+ private def handleHandlerError(log: LoggingAdapter, ex: Throwable):
HttpResponse = {
+ log.error(ex, "Internal server error, sending 500 response")
+ HttpResponse(StatusCodes.InternalServerError)
+ }
+
private def prepareServerAttributes(settings: ServerSettings, incoming:
Tcp.IncomingConnection) = {
val attrs = Http.prepareAttributes(settings, incoming)
if (telemetry == NoOpTelemetry) attrs
@@ -149,6 +170,7 @@ private[http] final class Http2Ext(implicit val system:
ActorSystem)
private def handleUpgradeRequests(
handler: HttpRequest => Future[HttpResponse],
+ handlerWithErrorHandling: HttpRequest => Future[HttpResponse],
settings: ServerSettings,
log: LoggingAdapter): HttpRequest => Future[HttpResponse] = { req =>
req.header[Upgrade] match {
@@ -172,8 +194,8 @@ private[http] final class Http2Ext(implicit val system:
ActorSystem)
Flow[HttpRequest]
.watchTermination(Keep.right)
.prepend(injectedRequest)
-
.via(Http2Blueprint.handleWithStreamIdHeader(settings.http2Settings.maxConcurrentStreams)(handler)(
- system.dispatcher))
+
.via(Http2Blueprint.handleWithStreamIdHeader(settings.http2Settings.maxConcurrentStreams)(
+ handlerWithErrorHandling)(system.dispatcher))
// the settings from the header are injected into the
blueprint as initial demuxer settings
.joinMat(Http2Blueprint.serverStack(settings, log,
settingsFromHeader, true, telemetry,
Http().dateHeaderRendering))(Keep.left))
diff --git
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/RequestParsing.scala
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/RequestParsing.scala
index 8b3f148b7..99e0c20be 100644
---
a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/RequestParsing.scala
+++
b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/RequestParsing.scala
@@ -160,6 +160,9 @@ private[http2] object RequestParsing {
case ":status" =>
protocolError("Pseudo-header ':status' is for responses only; it
cannot appear in a request")
+ case name if name.startsWith(":") =>
+ protocolError(s"Unexpected pseudo-header '$name' in request")
+
case "content-length" =>
if (contentLength == -1) {
val contentLengthValue = ContentLength.get(value).toLong
diff --git
a/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ClientServerSpec.scala
b/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ClientServerSpec.scala
index 96ddec9f6..6f718ec56 100644
---
a/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ClientServerSpec.scala
+++
b/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ClientServerSpec.scala
@@ -30,7 +30,6 @@ import pekko.http.scaladsl.model.{
HttpHeader,
HttpMethod,
HttpMethods,
- HttpProtocols,
HttpRequest,
HttpResponse,
RequestResponseAssociation,
@@ -45,6 +44,8 @@ import pekko.http.scaladsl.unmarshalling.Unmarshal
import pekko.stream.StreamTcpException
import pekko.stream.scaladsl.{ Sink, Source }
import pekko.stream.testkit.{ TestPublisher, TestSubscriber }
+import pekko.stream.testkit.Utils.TE
+import pekko.testkit.EventFilter
import pekko.testkit.TestProbe
import pekko.util.ByteString
@@ -144,6 +145,31 @@ class Http2ClientServerSpec extends
PekkoSpecWithMaterializer(
val response = expectClientResponse()
response.status should be(StatusCodes.BadRequest)
}
+
+ "return internal server error when handler future fails" in new TestSetup {
+ sendClientRequest()
+ val serverRequest = expectServerRequest()
+
+ EventFilter[TE](message = "boom", occurrences = 1).intercept {
+ serverRequest.promise.failure(TE("boom"))
+ val response = expectClientResponse()
+ response.status should be(StatusCodes.InternalServerError)
+ }
+
+ sendClientRequest()
+ val nextServerRequest = expectServerRequest()
+ nextServerRequest.sendResponse(HttpResponse())
+ expectClientResponse().status should be(StatusCodes.OK)
+ }
+
+ "return internal server error when handler throws synchronously" in new
TestSetup {
+ override def handler: HttpRequest => Future[HttpResponse] = _ => throw
TE("boom-sync")
+
+ EventFilter[TE](message = "boom-sync", occurrences = 1).intercept {
+ sendClientRequest()
+ expectClientResponse().status should
be(StatusCodes.InternalServerError)
+ }
+ }
}
case class ServerRequest(request: HttpRequest, promise:
Promise[HttpResponse]) {
@@ -169,7 +195,7 @@ class Http2ClientServerSpec extends
PekkoSpecWithMaterializer(
def serverSettings: ServerSettings = ServerSettings(system)
def clientSettings: ClientConnectionSettings =
ClientConnectionSettings(system)
private lazy val serverRequestProbe = TestProbe()
- private lazy val handler: HttpRequest => Future[HttpResponse] = { req =>
+ def handler: HttpRequest => Future[HttpResponse] = { req =>
val p = Promise[HttpResponse]()
serverRequestProbe.ref ! ServerRequest(req, p)
p.future
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]