This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch 1.4.x
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git
The following commit(s) were added to refs/heads/1.4.x by this push:
new d686b0cae fix(aws-spi-pekko-http): preserve SDK Content-Length to
prevent chunked encoding breaking SigV4 signing (#1546) (#1560)
d686b0cae is described below
commit d686b0cae182a088808ba189acebeb5f74323da1
Author: PJ Fanning <[email protected]>
AuthorDate: Wed Apr 8 14:42:41 2026 +0200
fix(aws-spi-pekko-http): preserve SDK Content-Length to prevent chunked
encoding breaking SigV4 signing (#1546) (#1560)
* Initial plan
* fix: preserve Content-Length from AWS SDK headers to prevent chunked
transfer encoding in S3 multipart uploads
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-connectors/sessions/5e1c413d-04aa-4dd9-b961-bb747147fe43
* scalafmt
* fix: use HttpEntity.Chunked.fromData when content length is unknown
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-connectors/sessions/71bfd062-b42b-4547-b302-04d169bdb895
* Revert "fix: use HttpEntity.Chunked.fromData when content length is
unknown"
This reverts commit 9155a0634e0d0ac4d3ce8e925ff6b816f0903129.
* Create modify-convert-headers.backwards.excludes
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
.../modify-convert-headers.backwards.excludes | 20 +++++++
.../stream/connectors/awsspi/PekkoHttpClient.scala | 65 +++++++++++++---------
.../connectors/awsspi/PekkoHttpClientSpec.scala | 50 ++++++++++++++++-
3 files changed, 107 insertions(+), 28 deletions(-)
diff --git
a/aws-spi-pekko-http/src/main/mima-filters/1.4.x.backward.excludes/modify-convert-headers.backwards.excludes
b/aws-spi-pekko-http/src/main/mima-filters/1.4.x.backward.excludes/modify-convert-headers.backwards.excludes
new file mode 100644
index 000000000..7b6b0a081
--- /dev/null
+++
b/aws-spi-pekko-http/src/main/mima-filters/1.4.x.backward.excludes/modify-convert-headers.backwards.excludes
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# convertHeaders and entityForMethodAndContentType are package private and
were recently changed
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.stream.connectors.awsspi.PekkoHttpClient.convertHeaders")
+ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.stream.connectors.awsspi.PekkoHttpClient.entityForMethodAndContentType")
diff --git
a/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala
b/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala
index b3ad36746..060a2a415 100644
---
a/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala
+++
b/aws-spi-pekko-http/src/main/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClient.scala
@@ -80,22 +80,30 @@ object PekkoHttpClient {
private[awsspi] def toPekkoRequest(request: SdkHttpRequest,
contentPublisher: SdkHttpContentPublisher): HttpRequest = {
- val (contentTypeHeader, reqheaders) = convertHeaders(request.headers())
+ val (contentTypeHeader, reqheaders, sdkContentLength) =
convertHeaders(request.headers())
val method = convertMethod(request.method().name())
HttpRequest(
method = method,
uri = Uri(request.getUri.toString),
headers = reqheaders,
- entity =
- entityForMethodAndContentType(method,
contentTypeHeaderToContentType(contentTypeHeader), contentPublisher),
+ entity = entityForMethodAndContentType(method,
contentTypeHeaderToContentType(contentTypeHeader),
+ contentPublisher, sdkContentLength),
protocol = HttpProtocols.`HTTP/1.1`)
}
private[awsspi] def entityForMethodAndContentType(method: HttpMethod,
contentType: ContentType,
- contentPublisher: SdkHttpContentPublisher): RequestEntity =
+ contentPublisher: SdkHttpContentPublisher,
+ sdkContentLength: Option[Long] = None): RequestEntity =
method.requestEntityAcceptance match {
- case Expected => contentPublisher.contentLength().toScala match {
+ case Expected =>
+ // Prefer the content length from the SDK request headers over the
publisher's value.
+ // This ensures that when the AWS SDK has set a Content-Length (which
it always does for
+ // non-chunked-signing requests like UploadPart), Pekko HTTP sends a
Content-Length entity
+ // rather than falling back to chunked transfer encoding, which would
break AWS SigV4 signing.
+ val contentLength: Option[Long] =
+
sdkContentLength.orElse(contentPublisher.contentLength().toScala.map(_.toLong))
+ contentLength match {
case Some(length) =>
HttpEntity(contentType, length,
Source.fromPublisher(contentPublisher).map(ByteString(_)))
case None => HttpEntity(contentType,
Source.fromPublisher(contentPublisher).map(ByteString(_)))
@@ -121,35 +129,40 @@ object PekkoHttpClient {
//
.getOrElse(ContentTypes.NoContentType)
- // This method converts the headers to Akka-http headers and drops
content-length and returns content-type separately
+ // This method converts the headers to Pekko-http headers, drops
content-length (returning its value separately),
+ // and returns content-type separately
private[awsspi] def convertHeaders(
- headers: java.util.Map[String, java.util.List[String]]):
(Option[HttpHeader], immutable.Seq[HttpHeader]) = {
+ headers: java.util.Map[String, java.util.List[String]]):
(Option[HttpHeader], immutable.Seq[HttpHeader],
+ Option[Long]) = {
val headersAsScala = {
val builder = collection.mutable.Map.newBuilder[String,
java.util.List[String]]
headers.forEach { case (k, v) => builder += k -> v }
builder.result()
}
- headersAsScala.foldLeft((Option.empty[HttpHeader],
List.empty[HttpHeader])) { case ((ctHeader, hdrs), header) =>
- val (headerName, headerValue) = header
- if (headerValue.size() != 1) {
- throw new IllegalArgumentException(
- s"Found invalid header: key: $headerName, Value: ${val list =
List.newBuilder[String]
- headerValue.forEach(v => list += v)
- list.result()}.")
- }
- // skip content-length as it will be calculated by pekko-http itself and
must not be provided in the request headers
- if (`Content-Length`.lowercaseName == headerName.toLowerCase) (ctHeader,
hdrs)
- else {
- HttpHeader.parse(headerName, headerValue.get(0)) match {
- case ok: Ok =>
- // return content-type separately as it will be used to calculate
ContentType, which is used on HttpEntity
- if (ok.header.lowercaseName() == `Content-Type`.lowercaseName)
(Some(ok.header), hdrs)
- else (ctHeader, hdrs :+ ok.header)
- case error: ParsingResult.Error =>
- throw new IllegalArgumentException(s"Found invalid header:
${error.errors}.")
+ headersAsScala.foldLeft((Option.empty[HttpHeader], List.empty[HttpHeader],
Option.empty[Long])) {
+ case ((ctHeader, hdrs, contentLength), header) =>
+ val (headerName, headerValue) = header
+ if (headerValue.size() != 1) {
+ throw new IllegalArgumentException(
+ s"Found invalid header: key: $headerName, Value: ${val list =
List.newBuilder[String]
+ headerValue.forEach(v => list += v)
+ list.result()}.")
+ }
+ // skip content-length as it will be managed by pekko-http in the
entity, but capture its value
+ // so we can use it to build a fixed-length entity, preventing a
fallback to chunked transfer encoding
+ if (`Content-Length`.lowercaseName == headerName.toLowerCase)
+ (ctHeader, hdrs, Some(headerValue.get(0).toLong))
+ else {
+ HttpHeader.parse(headerName, headerValue.get(0)) match {
+ case ok: Ok =>
+ // return content-type separately as it will be used to
calculate ContentType, which is used on HttpEntity
+ if (ok.header.lowercaseName() == `Content-Type`.lowercaseName)
(Some(ok.header), hdrs, contentLength)
+ else (ctHeader, hdrs :+ ok.header, contentLength)
+ case error: ParsingResult.Error =>
+ throw new IllegalArgumentException(s"Found invalid header:
${error.errors}.")
+ }
}
- }
}
}
diff --git
a/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientSpec.scala
b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientSpec.scala
index 8e93ddfee..e9377f93d 100644
---
a/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientSpec.scala
+++
b/aws-spi-pekko-http/src/test/scala/org/apache/pekko/stream/connectors/awsspi/PekkoHttpClientSpec.scala
@@ -18,17 +18,20 @@
package org.apache.pekko.stream.connectors.awsspi
import java.util.Collections
+import java.nio.ByteBuffer
import com.typesafe.config.ConfigFactory
import org.apache.pekko
import pekko.http.scaladsl.model.headers.`Content-Type`
-import pekko.http.scaladsl.model.MediaTypes
+import pekko.http.scaladsl.model.{ ContentTypes, HttpMethods, MediaTypes }
import pekko.http.scaladsl.settings.{ ClientConnectionSettings,
ConnectionPoolSettings }
import pekko.util.JavaDurationConverters._
+import org.reactivestreams.Subscriber
import org.scalatest.OptionValues
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
import software.amazon.awssdk.http.SdkHttpConfigurationOption
+import software.amazon.awssdk.http.async.SdkHttpContentPublisher
import software.amazon.awssdk.utils.AttributeMap
import scala.concurrent.duration._
@@ -49,11 +52,54 @@ class PekkoHttpClientSpec extends AnyWordSpec with Matchers
with OptionValues {
headers.put("Content-Length", Collections.singletonList("123"))
headers.put("Accept", Collections.singletonList("*/*"))
- val (contentTypeHeader, reqHeaders) =
PekkoHttpClient.convertHeaders(headers)
+ val (contentTypeHeader, reqHeaders, contentLength) =
PekkoHttpClient.convertHeaders(headers)
contentTypeHeader.value.lowercaseName() shouldBe
`Content-Type`.lowercaseName
reqHeaders should have size 1
+ contentLength shouldBe Some(123L)
}
+
+ "return None content length when Content-Length header is absent" in {
+ val headers = new java.util.HashMap[String, java.util.List[String]]
+ headers.put("Content-Type", Collections.singletonList("application/xml"))
+ headers.put("Accept", Collections.singletonList("*/*"))
+
+ val (_, _, contentLength) = PekkoHttpClient.convertHeaders(headers)
+
+ contentLength shouldBe None
+ }
+ "use sdk content length from headers when publisher returns empty
contentLength" in {
+ val publisher = new SdkHttpContentPublisher {
+ override def contentLength(): java.util.Optional[java.lang.Long] =
java.util.Optional.empty()
+ override def subscribe(s: Subscriber[_ >: ByteBuffer]): Unit = {}
+ }
+ val entity =
+ PekkoHttpClient.entityForMethodAndContentType(HttpMethods.PUT,
ContentTypes.NoContentType, publisher,
+ Some(42L))
+ entity.contentLengthOption shouldBe Some(42L)
+ }
+
+ "use publisher contentLength when sdkContentLength is absent" in {
+ val publisher = new SdkHttpContentPublisher {
+ override def contentLength(): java.util.Optional[java.lang.Long] =
java.util.Optional.of(99L)
+ override def subscribe(s: Subscriber[_ >: ByteBuffer]): Unit = {}
+ }
+ val entity =
+ PekkoHttpClient.entityForMethodAndContentType(HttpMethods.PUT,
ContentTypes.NoContentType, publisher, None)
+ entity.contentLengthOption shouldBe Some(99L)
+ }
+
+ "prefer sdk content length over publisher contentLength when both are
present" in {
+ val publisher = new SdkHttpContentPublisher {
+ override def contentLength(): java.util.Optional[java.lang.Long] =
java.util.Optional.of(55L)
+ override def subscribe(s: Subscriber[_ >: ByteBuffer]): Unit = {}
+ }
+ val entity =
+ PekkoHttpClient.entityForMethodAndContentType(HttpMethods.PUT,
ContentTypes.NoContentType, publisher,
+ Some(42L))
+ entity.contentLengthOption shouldBe Some(42L)
+ }
+
"build() should use default ConnectionPoolSettings" in {
val pekkoClient: PekkoHttpClient = new
PekkoHttpAsyncHttpService().createAsyncHttpClientFactory()
.build()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]