This is an automated email from the ASF dual-hosted git repository.

mdedetrich pushed a commit to branch 1.3.x
in repository https://gitbox.apache.org/repos/asf/pekko-connectors.git


The following commit(s) were added to refs/heads/1.3.x by this push:
     new a93fdc80c AWS S3: Handle Illegal Headers in Copy Part
a93fdc80c is described below

commit a93fdc80cafad890c149b916a5e9f92351428a22
Author: Luka J <[email protected]>
AuthorDate: Fri Nov 28 09:36:08 2025 +0100

    AWS S3: Handle Illegal Headers in Copy Part
    
    (cherry picked from commit 53d7b1dabbd50aface0a89b51a0c20ea1624630e)
---
 s3/src/main/resources/reference.conf               | 210 +++++++++++++++++++++
 .../pekko/stream/connectors/s3/S3Headers.scala     |   6 +-
 .../stream/connectors/s3/impl/S3Request.scala      |  97 ++++++++--
 .../pekko/stream/connectors/s3/impl/S3Stream.scala |  60 +++---
 .../pekko/stream/connectors/s3/settings.scala      | 112 +++++++++--
 s3/src/test/scala/docs/scaladsl/S3SinkSpec.scala   |  14 +-
 .../pekko/stream/connectors/s3/S3HeadersSpec.scala | 113 +++++++++++
 .../stream/connectors/s3/S3SettingsSpec.scala      |  59 ++++++
 .../connectors/s3/impl/HttpRequestsSpec.scala      |   2 +-
 .../stream/connectors/s3/impl/S3StreamSpec.scala   |   4 +-
 10 files changed, 615 insertions(+), 62 deletions(-)

diff --git a/s3/src/main/resources/reference.conf 
b/s3/src/main/resources/reference.conf
index d24cd82ee..4521e6475 100644
--- a/s3/src/main/resources/reference.conf
+++ b/s3/src/main/resources/reference.conf
@@ -125,4 +125,214 @@ pekko.connectors.s3 {
 
   # Add signature headers to requests when aws.credentials.provider is anon
   sign-anonymous-requests = true
+
+  # An allow list of headers for each S3Request type as defined by the AWS 
specification - this merged with additional-allowed-headers
+  allowed-headers {
+    # 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html#API_GetObject_RequestSyntax
+    GetObject = [
+      "Host",
+      "If-Match",
+      "If-Modified-Since",
+      "If-None-Match",
+      "If-Unmodified-Since",
+      "Range",
+      "x-amz-server-side-encryption-customer-algorithm",
+      "x-amz-server-side-encryption-customer-key",
+      "x-amz-server-side-encryption-customer-key-MD5",
+      "x-amz-request-payer",
+      "x-amz-expected-bucket-owner",
+      "x-amz-checksum-mode"
+    ]
+    # 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_RequestSyntax
+    HeadObject = [
+      "Host",
+      "If-Match",
+      "If-Modified-Since",
+      "If-None-Match",
+      "If-Unmodified-Since",
+      "Range",
+      "x-amz-server-side-encryption-customer-algorithm",
+      "x-amz-server-side-encryption-customer-key",
+      "x-amz-server-side-encryption-customer-key-MD5",
+      "x-amz-request-payer",
+      "x-amz-expected-bucket-owner",
+      "x-amz-checksum-mode"
+    ]
+    # 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html#API_PutObject_RequestSyntax
+    PutObject = [
+      "Host",
+      "x-amz-acl",
+      "Cache-Control",
+      "Content-Disposition",
+      "Content-Encoding",
+      "Content-Language",
+      "Content-Length",
+      "Content-MD5",
+      "Content-Type",
+      "x-amz-sdk-checksum-algorithm",
+      "x-amz-checksum-crc32",
+      "x-amz-checksum-crc32c",
+      "x-amz-checksum-crc64nvme",
+      "x-amz-checksum-sha1",
+      "x-amz-checksum-sha256",
+      "Expires",
+      "If-Match",
+      "If-None-Match",
+      "x-amz-grant-full-control",
+      "x-amz-grant-read",
+      "x-amz-grant-read-acp",
+      "x-amz-grant-write-acp",
+      "x-amz-write-offset-bytes",
+      "x-amz-server-side-encryption",
+      "x-amz-storage-class",
+      "x-amz-website-redirect-location",
+      "x-amz-server-side-encryption-customer-algorithm",
+      "x-amz-server-side-encryption-customer-key",
+      "x-amz-server-side-encryption-customer-key-MD5",
+      "x-amz-server-side-encryption-aws-kms-key-id",
+      "x-amz-server-side-encryption-context",
+      "x-amz-server-side-encryption-bucket-key-enabled",
+      "x-amz-request-payer",
+      "x-amz-tagging",
+      "x-amz-object-lock-mode",
+      "x-amz-object-lock-retain-until-date",
+      "x-amz-object-lock-legal-hold",
+      "x-amz-expected-bucket-owner"
+    ]
+    # 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_CreateMultipartUpload.html#API_CreateMultipartUpload_RequestSyntax
+    InitiateMultipartUpload = [
+      "Host",
+      "x-amz-acl",
+      "Cache-Control",
+      "Content-Disposition",
+      "Content-Encoding",
+      "Content-Language",
+      "Content-Type",
+      "Expires",
+      "x-amz-grant-full-control",
+      "x-amz-grant-read",
+      "x-amz-grant-read-acp",
+      "x-amz-grant-write-acp",
+      "x-amz-server-side-encryption",
+      "x-amz-storage-class",
+      "x-amz-website-redirect-location",
+      "x-amz-server-side-encryption-customer-algorithm",
+      "x-amz-server-side-encryption-customer-key",
+      "x-amz-server-side-encryption-customer-key-MD5",
+      "x-amz-server-side-encryption-aws-kms-key-id",
+      "x-amz-server-side-encryption-context",
+      "x-amz-server-side-encryption-bucket-key-enabled",
+      "x-amz-request-payer",
+      "x-amz-tagging",
+      "x-amz-object-lock-mode",
+      "x-amz-object-lock-retain-until-date",
+      "x-amz-object-lock-legal-hold",
+      "x-amz-expected-bucket-owner",
+      "x-amz-checksum-algorithm",
+      "x-amz-checksum-type"
+    ]
+    # 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPart.html#API_UploadPart_RequestSyntax
 
+    UploadPart = [
+      "Host",
+      "Content-Length",
+      "Content-MD5",
+      "x-amz-sdk-checksum-algorithm",
+      "x-amz-checksum-crc32",
+      "x-amz-checksum-crc32c",
+      "x-amz-checksum-crc64nvme",
+      "x-amz-checksum-sha1",
+      "x-amz-checksum-sha256",
+      "x-amz-server-side-encryption-customer-algorithm",
+      "x-amz-server-side-encryption-customer-key",
+      "x-amz-server-side-encryption-customer-key-MD5",
+      "x-amz-request-payer",
+      "x-amz-expected-bucket-owner"
+    ]
+    # 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPartCopy.html#API_UploadPartCopy_RequestSyntax
+    CopyPart = [
+      "Host",
+      "x-amz-copy-source",
+      "x-amz-copy-source-if-match",
+      "x-amz-copy-source-if-modified-since",
+      "x-amz-copy-source-if-none-match",
+      "x-amz-copy-source-if-unmodified-since",
+      "x-amz-copy-source-range",
+      "x-amz-server-side-encryption-customer-algorithm",
+      "x-amz-server-side-encryption-customer-key",
+      "x-amz-server-side-encryption-customer-key-MD5",
+      "x-amz-copy-source-server-side-encryption-customer-algorithm",
+      "x-amz-copy-source-server-side-encryption-customer-key",
+      "x-amz-copy-source-server-side-encryption-customer-key-MD5",
+      "x-amz-request-payer",
+      "x-amz-expected-bucket-owner",
+      "x-amz-source-expected-bucket-owner"
+    ]
+    # 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObject.html#API_DeleteObject_RequestSyntax
+    DeleteObject = [
+      "Host",
+      "x-amz-mfa",
+      "x-amz-request-payer",
+      "x-amz-bypass-governance-retention",
+      "x-amz-expected-bucket-owner",
+      "If-Match",
+      "x-amz-if-match-last-modified-time",
+      "x-amz-if-match-size"
+    ]
+    # 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListBuckets.html#API_ListBuckets_RequestSyntax
+    ListBucket = [
+      "Host"
+    ]
+    # 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_CreateBucket.html#API_CreateBucket_RequestSyntax
+    MakeBucket = [
+      "Host",
+      "x-amz-acl",
+      "x-amz-grant-full-control",
+      "x-amz-grant-read",
+      "x-amz-grant-read-acp",
+      "x-amz-grant-write",
+      "x-amz-grant-write-acp",
+      "x-amz-bucket-object-lock-enabled",
+      "x-amz-object-ownership"
+    ]
+    # 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteBucket.html#API_DeleteBucket_RequestSyntax
+    DeleteBucket = [
+      "Host",
+      "x-amz-expected-bucket-owner"
+    ]
+    # 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadBucket.html#API_HeadBucket_RequestSyntax
+    CheckBucket = [
+      "Host",
+      "x-amz-expected-bucket-owner"
+    ]
+    # 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_control_PutBucketVersioning.html#API_control_PutBucketVersioning_RequestSyntax
+    PutBucketVersioning = [
+      "Host",
+      "Content-MD5",
+      "x-amz-sdk-checksum-algorithm",
+      "x-amz-mfa",
+      "x-amz-expected-bucket-owner"
+    ]
+    # 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_control_GetBucketVersioning.html#API_control_GetBucketVersioning_RequestSyntax
+    GetBucketVersioning = [
+      "Host",
+      "x-amz-expected-bucket-owner"
+    ]
+  }
+
+  # An additional allow list for each S3Request type for cases where non 
standard request headers are needed or the S3 specification evolves to include 
new headers
+  additional-allowed-headers {
+    GetObject = []
+    HeadObject = []
+    PutObject = []
+    InitiateMultipartUpload = []
+    UploadPart = []
+    CopyPart = []
+    DeleteObject = []
+    ListBucket = []
+    MakeBucket = []
+    DeleteBucket = []
+    CheckBucket = []
+    PutBucketVersioning = []
+    GetBucketVersioning = []
+  }
 }
diff --git 
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/S3Headers.scala 
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/S3Headers.scala
index 7ab95395b..71cdb126d 100644
--- a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/S3Headers.scala
+++ b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/S3Headers.scala
@@ -76,8 +76,10 @@ final class S3Headers private (val cannedAcl: 
Option[CannedAcl] = None,
       RawHeader(header._1, header._2)
     }
 
-  @InternalApi private[s3] def headersFor(request: S3Request) =
-    headers ++ serverSideEncryption.toIndexedSeq.flatMap(_.headersFor(request))
+  @InternalApi private[s3] def headersFor(request: S3Request)(implicit 
s3Settings: S3Settings) =
+    headers.filter(header =>
+      s3Settings.concreteAllowedHeaders.getOrElse(request, Set.empty).contains(
+        header.name())) ++ 
serverSideEncryption.toIndexedSeq.flatMap(_.headersFor(request))
 
   def withCannedAcl(cannedAcl: CannedAcl): S3Headers = copy(cannedAcl = 
Some(cannedAcl))
   def withMetaHeaders(metaHeaders: MetaHeaders): S3Headers = copy(metaHeaders 
= Some(metaHeaders))
diff --git 
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Request.scala 
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Request.scala
index 19e3858f2..4789ca19f 100644
--- 
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Request.scala
+++ 
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Request.scala
@@ -13,7 +13,10 @@
 
 package org.apache.pekko.stream.connectors.s3.impl
 
-import org.apache.pekko.annotation.InternalApi
+import org.apache.pekko
+
+import pekko.annotation.InternalApi
+import pekko.util.OptionVal
 
 /**
  * Internal Api
@@ -23,64 +26,130 @@ import org.apache.pekko.annotation.InternalApi
 /**
  * Internal Api
  */
-@InternalApi private[s3] case object GetObject extends S3Request
+@InternalApi private[s3] object S3Request {
+  def fromString(str: String): OptionVal[S3Request] = {
+    str match {
+      case "GetObject"               => OptionVal(GetObject)
+      case "HeadObject"              => OptionVal(HeadObject)
+      case "PutObject"               => OptionVal(PutObject)
+      case "InitiateMultipartUpload" => OptionVal(InitiateMultipartUpload)
+      case "UploadPart"              => OptionVal(UploadPart)
+      case "CopyPart"                => OptionVal(CopyPart)
+      case "DeleteObject"            => OptionVal(DeleteObject)
+      case "ListBucket"              => OptionVal(ListBucket)
+      case "MakeBucket"              => OptionVal(MakeBucket)
+      case "DeleteBucket"            => OptionVal(DeleteBucket)
+      case "CheckBucket"             => OptionVal(CheckBucket)
+      case "PutBucketVersioning"     => OptionVal(PutBucketVersioning)
+      case "GetBucketVersioning"     => OptionVal(GetBucketVersioning)
+      case _                         => OptionVal.None
+    }
+  }
+
+  val allRequests: List[S3Request] = List(
+    GetObject,
+    HeadObject,
+    PutObject,
+    InitiateMultipartUpload,
+    UploadPart,
+    CopyPart,
+    DeleteObject,
+    ListBucket,
+    MakeBucket,
+    DeleteBucket,
+    CheckBucket,
+    PutBucketVersioning,
+    GetBucketVersioning
+  )
+}
+
+/**
+ * Internal Api
+ */
+@InternalApi private[s3] case object GetObject extends S3Request {
+  override def toString() = "GetObject"
+}
 
 /**
  * Internal Api
  */
-@InternalApi private[s3] case object HeadObject extends S3Request
+@InternalApi private[s3] case object HeadObject extends S3Request {
+  override def toString() = "HeadObject"
+}
 
 /**
  * Internal Api
  */
-@InternalApi private[s3] case object PutObject extends S3Request
+@InternalApi private[s3] case object PutObject extends S3Request {
+  override def toString() = "PutObject"
+}
 
 /**
  * Internal Api
  */
-@InternalApi private[s3] case object InitiateMultipartUpload extends S3Request
+@InternalApi private[s3] case object InitiateMultipartUpload extends S3Request 
{
+  override def toString() = "InitiateMultipartUpload"
+}
 
 /**
  * Internal Api
  */
-@InternalApi private[s3] case object UploadPart extends S3Request
+@InternalApi private[s3] case object UploadPart extends S3Request {
+  override def toString() = "UploadPart"
+}
 
 /**
  * Internal Api
  */
-@InternalApi private[s3] case object CopyPart extends S3Request
+@InternalApi private[s3] case object CopyPart extends S3Request {
+  override def toString() = "CopyPart"
+}
 
 /**
  * Internal Api
  */
-@InternalApi private[s3] case object DeleteObject extends S3Request
+@InternalApi private[s3] case object DeleteObject extends S3Request {
+  override def toString() = "DeleteObject"
+}
 
 /**
  * Internal Api
  */
-@InternalApi private[s3] case object ListBucket extends S3Request
+@InternalApi private[s3] case object ListBucket extends S3Request {
+  override def toString() = "ListBucket"
+}
 
 /**
  * Internal Api
  */
-@InternalApi private[s3] case object MakeBucket extends S3Request
+@InternalApi private[s3] case object MakeBucket extends S3Request {
+  override def toString() = "MakeBucket"
+}
 
 /**
  * Internal Api
  */
-@InternalApi private[s3] case object DeleteBucket extends S3Request
+@InternalApi private[s3] case object DeleteBucket extends S3Request {
+  override def toString() = "DeleteBucket"
+}
 
 /**
  * Internal Api
  */
-@InternalApi private[s3] case object CheckBucket extends S3Request
+@InternalApi private[s3] case object CheckBucket extends S3Request {
+  override def toString() = "CheckBucket"
+}
 
 /**
  * Internal Api
  */
-@InternalApi private[s3] case object PutBucketVersioning extends S3Request
+@InternalApi private[s3] case object PutBucketVersioning extends S3Request {
+  override def toString() = "PutBucketVersioning"
+}
 
 /**
  * Internal Api
  */
-@InternalApi private[s3] case object GetBucketVersioning extends S3Request
+@InternalApi private[s3] case object GetBucketVersioning extends S3Request {
+  override def toString() = "GetBucketVersioning"
+}
diff --git 
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala 
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala
index 89341f9a4..1b080e24a 100644
--- 
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala
+++ 
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala
@@ -187,11 +187,12 @@ import scala.util.{ Failure, Success, Try }
       range: Option[ByteRange],
       versionId: Option[String],
       s3Headers: S3Headers): Source[Option[(Source[ByteString, NotUsed], 
ObjectMetadata)], NotUsed] = {
-    val headers = s3Headers.headersFor(GetObject)
 
     Source
       .fromMaterializer { (mat, attr) =>
         implicit val materializer: Materializer = mat
+        implicit val conf: S3Settings = resolveSettings(attr, mat.system)
+        val headers = s3Headers.headersFor(GetObject)
         issueRequest(s3Location, rangeOption = range, versionId = versionId, 
s3Headers = headers)(mat, attr)
           .map(response => 
response.withEntity(response.entity.withoutSizeLimit))
           .mapAsync(parallelism = 1)(entityForSuccess)
@@ -210,12 +211,13 @@ import scala.util.{ Failure, Success, Try }
       range: Option[ByteRange],
       versionId: Option[String],
       s3Headers: S3Headers): Source[ByteString, Future[ObjectMetadata]] = {
-    val headers = s3Headers.headersFor(GetObject)
 
     Source
       .fromMaterializer { (mat, attr) =>
         val objectMetadataMat = Promise[ObjectMetadata]()
         implicit val materializer: Materializer = mat
+        implicit val conf: S3Settings = resolveSettings(attr, mat.system)
+        val headers = s3Headers.headersFor(GetObject)
         issueRequest(s3Location, rangeOption = range, versionId = versionId, 
s3Headers = headers)(mat, attr)
           .map(response => 
response.withEntity(response.entity.withoutSizeLimit))
           .mapAsync(parallelism = 1)(entityForSuccess)
@@ -580,6 +582,7 @@ import scala.util.{ Failure, Success, Try }
       .fromMaterializer { (mat, attr) =>
         implicit val materializer: Materializer = mat
         import mat.executionContext
+        implicit val conf: S3Settings = resolveSettings(attr, mat.system)
         val headers = s3Headers.headersFor(HeadObject)
         issueRequest(S3Location(bucket, key), HttpMethods.HEAD, versionId = 
versionId, s3Headers = headers)(mat, attr)
           .flatMapConcat {
@@ -610,7 +613,7 @@ import scala.util.{ Failure, Success, Try }
     Source
       .fromMaterializer { (mat, attr) =>
         implicit val m: Materializer = mat
-
+        implicit val conf: S3Settings = resolveSettings(attr, mat.system)
         val headers = s3Headers.headersFor(DeleteObject)
         issueRequest(s3Location, HttpMethods.DELETE, versionId = versionId, 
s3Headers = headers)(mat, attr)
           .flatMapConcat {
@@ -659,8 +662,6 @@ import scala.util.{ Failure, Success, Try }
     // TODO can we take in a Source[ByteString, NotUsed] without forcing 
chunking
     // chunked requests are causing S3 to think this is a multipart upload
 
-    val headers = s3Headers.headersFor(PutObject)
-
     Source
       .fromMaterializer { (mat, attr) =>
         implicit val materializer: Materializer = mat
@@ -668,6 +669,7 @@ import scala.util.{ Failure, Success, Try }
         import mat.executionContext
         implicit val sys: ActorSystem = mat.system
         implicit val conf: S3Settings = resolveSettings(attr, mat.system)
+        val headers = s3Headers.headersFor(PutObject)
 
         val req = uploadRequest(s3Location, data, contentLength, contentType, 
headers)
 
@@ -717,8 +719,9 @@ import scala.util.{ Failure, Success, Try }
       case _           => downloadRequest
     }
 
-  private def bucketManagementRequest(bucket: String)(method: HttpMethod, 
conf: S3Settings): HttpRequest =
-    HttpRequests.bucketManagementRequest(S3Location(bucket, key = ""), 
method)(conf)
+  private def bucketManagementRequest(bucket: String, headers: S3Headers, 
request: S3Request)(method: HttpMethod,
+      conf: S3Settings): HttpRequest =
+    HttpRequests.bucketManagementRequest(S3Location(bucket, key = ""), method, 
headers.headersFor(request)(conf))(conf)
 
   def makeBucketSource(bucket: String, headers: S3Headers): Source[Done, 
NotUsed] = {
     Source
@@ -738,8 +741,7 @@ import scala.util.{ Failure, Success, Try }
         s3ManagementRequest[Done](
           bucket = bucket,
           method = HttpMethods.PUT,
-          httpRequest = bucketManagementRequest(bucket),
-          headers.headersFor(MakeBucket),
+          httpRequest = bucketManagementRequest(bucket, headers, MakeBucket),
           process = processS3LifecycleResponse,
           httpEntity = maybeRegionPayload)
       }
@@ -753,9 +755,9 @@ import scala.util.{ Failure, Success, Try }
     s3ManagementRequest[Done](
       bucket = bucket,
       method = HttpMethods.DELETE,
-      httpRequest = bucketManagementRequest(bucket),
-      headers.headersFor(DeleteBucket),
-      process = processS3LifecycleResponse)
+      httpRequest = bucketManagementRequest(bucket, headers, DeleteBucket),
+      process = processS3LifecycleResponse
+    )
 
   def deleteBucket(bucket: String, headers: S3Headers)(implicit mat: 
Materializer, attr: Attributes): Future[Done] =
     deleteBucketSource(bucket, 
headers).withAttributes(attr).runWith(Sink.ignore)
@@ -764,42 +766,42 @@ import scala.util.{ Failure, Success, Try }
     s3ManagementRequest[BucketAccess](
       bucket = bucketName,
       method = HttpMethods.HEAD,
-      httpRequest = bucketManagementRequest(bucketName),
-      headers.headersFor(CheckBucket),
+      httpRequest = bucketManagementRequest(bucketName, headers, CheckBucket),
       process = processCheckIfExistsResponse)
 
   def checkIfBucketExists(bucket: String, headers: S3Headers)(implicit mat: 
Materializer,
       attr: Attributes): Future[BucketAccess] =
     checkIfBucketExistsSource(bucket, 
headers).withAttributes(attr).runWith(Sink.head)
 
-  private def uploadManagementRequest(bucket: String, key: String, uploadId: 
String)(method: HttpMethod,
+  private def uploadManagementRequest(bucket: String, key: String, uploadId: 
String, s3Headers: S3Headers,
+      s3Request: S3Request)(method: HttpMethod,
       conf: S3Settings): HttpRequest =
-    HttpRequests.uploadManagementRequest(S3Location(bucket, key), uploadId, 
method)(conf)
+    HttpRequests.uploadManagementRequest(S3Location(bucket, key), uploadId, 
method,
+      s3Headers.headersFor(s3Request)(conf))(conf)
 
   def deleteUploadSource(bucket: String, key: String, uploadId: String, 
headers: S3Headers): Source[Done, NotUsed] =
     s3ManagementRequest[Done](
       bucket = bucket,
       method = HttpMethods.DELETE,
-      httpRequest = uploadManagementRequest(bucket, key, uploadId),
-      headers.headersFor(DeleteBucket),
+      httpRequest = uploadManagementRequest(bucket, key, uploadId, headers, 
DeleteBucket),
       process = processS3LifecycleResponse)
 
   def deleteUpload(bucket: String, key: String, uploadId: String, headers: 
S3Headers)(implicit mat: Materializer,
       attr: Attributes): Future[Done] =
     deleteUploadSource(bucket, key, uploadId, 
headers).withAttributes(attr).runWith(Sink.ignore)
 
-  private def bucketVersioningRequest(bucket: String, mfaStatus: 
Option[MFAStatus], headers: S3Headers)(
+  private def bucketVersioningRequest(bucket: String, mfaStatus: 
Option[MFAStatus], headers: S3Headers,
+      s3Request: S3Request)(
       method: HttpMethod,
       conf: S3Settings): HttpRequest =
-    HttpRequests.bucketVersioningRequest(bucket, mfaStatus, method, 
headers.headers)(conf)
+    HttpRequests.bucketVersioningRequest(bucket, mfaStatus, method, 
headers.headersFor(s3Request)(conf))(conf)
 
   def putBucketVersioningSource(
       bucket: String, bucketVersioning: BucketVersioning, headers: S3Headers): 
Source[Done, NotUsed] =
     s3ManagementRequest[Done](
       bucket = bucket,
       method = HttpMethods.PUT,
-      httpRequest = bucketVersioningRequest(bucket, 
bucketVersioning.mfaDelete, headers),
-      headers.headersFor(PutBucketVersioning),
+      httpRequest = bucketVersioningRequest(bucket, 
bucketVersioning.mfaDelete, headers, PutBucketVersioning),
       process = processS3LifecycleResponse,
       httpEntity = 
Some(putBucketVersioningPayload(bucketVersioning)(ExecutionContexts.parasitic)))
 
@@ -813,8 +815,7 @@ import scala.util.{ Failure, Success, Try }
     s3ManagementRequest[BucketVersioningResult](
       bucket = bucket,
       method = HttpMethods.GET,
-      httpRequest = bucketVersioningRequest(bucket, None, headers),
-      headers.headersFor(GetBucketVersioning),
+      httpRequest = bucketVersioningRequest(bucket, None, headers, 
GetBucketVersioning),
       process = { (response: HttpResponse, mat: Materializer) =>
         response match {
           case HttpResponse(status, _, entity, _) if status.isSuccess() =>
@@ -833,7 +834,6 @@ import scala.util.{ Failure, Success, Try }
       bucket: String,
       method: HttpMethod,
       httpRequest: (HttpMethod, S3Settings) => HttpRequest,
-      headers: Seq[HttpHeader],
       process: (HttpResponse, Materializer) => Future[T],
       httpEntity: Option[Future[RequestEntity]] = None): Source[T, NotUsed] =
     Source
@@ -1337,7 +1337,8 @@ import scala.util.{ Failure, Success, Try }
   private def requestInfoOrUploadState(s3Location: S3Location,
       contentType: ContentType,
       s3Headers: S3Headers,
-      initialUploadState: Option[(String, Int)]): Source[(MultipartUpload, 
Int), NotUsed] = {
+      initialUploadState: Option[(String, Int)])(
+      implicit s3Settings: S3Settings): Source[(MultipartUpload, Int), 
NotUsed] = {
     initialUploadState match {
       case Some((uploadId, initialIndex)) =>
         // We are resuming from a previously aborted Multipart upload so 
rather than creating a new MultipartUpload
@@ -1504,15 +1505,14 @@ import scala.util.{ Failure, Success, Try }
       contentType: ContentType,
       s3Headers: S3Headers,
       partitions: Source[List[CopyPartition], NotUsed]) = {
-    val requestInfo: Source[(MultipartUpload, Int), NotUsed] =
-      initiateUpload(location, contentType, 
s3Headers.headersFor(InitiateMultipartUpload))
-
-    val headers = s3Headers.headersFor(CopyPart)
 
     Source
       .fromMaterializer { (mat, attr) =>
         implicit val conf: S3Settings = resolveSettings(attr, mat.system)
 
+        val headers = s3Headers.headersFor(CopyPart)
+        val requestInfo: Source[(MultipartUpload, Int), NotUsed] =
+          initiateUpload(location, contentType, 
s3Headers.headersFor(InitiateMultipartUpload))
         requestInfo
           .zipWith(partitions) {
             case ((upload, _), ls) =>
diff --git 
a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/settings.scala 
b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/settings.scala
index 5e32a228a..8d19d685f 100644
--- a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/settings.scala
+++ b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/settings.scala
@@ -22,7 +22,10 @@ import org.apache.pekko
 import pekko.actor.{ ActorSystem, ClassicActorSystemProvider }
 import pekko.http.scaladsl.model.Uri
 import pekko.stream.connectors.s3.AccessStyle.{ PathAccessStyle, 
VirtualHostAccessStyle }
+import pekko.stream.connectors.s3.impl.S3Request
 import pekko.util.OptionConverters._
+import pekko.util.OptionVal
+import pekko.util.ccompat.JavaConverters._
 import com.typesafe.config.Config
 import org.slf4j.LoggerFactory
 import software.amazon.awssdk.auth.credentials._
@@ -359,7 +362,9 @@ final class S3Settings private (
     val validateObjectKey: Boolean,
     val retrySettings: RetrySettings,
     val multipartUploadSettings: MultipartUploadSettings,
-    val signAnonymousRequests: Boolean) {
+    val signAnonymousRequests: Boolean,
+    val allowedHeaders: Map[String, Set[String]]
+) {
 
   /** Java API */
   def getBufferType: BufferType = bufferType
@@ -420,6 +425,17 @@ final class S3Settings private (
   def withSignAnonymousRequests(value: Boolean): S3Settings =
     if (signAnonymousRequests == value) this else copy(signAnonymousRequests = 
value)
 
+  private[s3] val concreteAllowedHeaders: Map[S3Request, Set[String]] = {
+    allowedHeaders.foldLeft(Map.empty[S3Request, Set[String]]) {
+      case (acc, (header, value)) =>
+        S3Request.fromString(header) match {
+          case OptionVal.Some(header) => acc + (header -> value)
+          case OptionVal.None         => acc
+          case other                  => throw new MatchError(other)
+        }
+    }
+  }
+
   private def copy(
       bufferType: BufferType = bufferType,
       credentialsProvider: AwsCredentialsProvider = credentialsProvider,
@@ -431,7 +447,9 @@ final class S3Settings private (
       validateObjectKey: Boolean = validateObjectKey,
       retrySettings: RetrySettings = retrySettings,
       multipartUploadSettings: MultipartUploadSettings = 
multipartUploadSettings,
-      signAnonymousRequests: Boolean = signAnonymousRequests): S3Settings = 
new S3Settings(
+      signAnonymousRequests: Boolean = signAnonymousRequests,
+      allowedHeaders: Map[String, Set[String]] = allowedHeaders
+  ): S3Settings = new S3Settings(
     bufferType,
     credentialsProvider,
     s3RegionProvider,
@@ -442,7 +460,9 @@ final class S3Settings private (
     validateObjectKey,
     retrySettings,
     multipartUploadSettings,
-    signAnonymousRequests)
+    signAnonymousRequests,
+    allowedHeaders
+  )
 
   override def toString: String =
     "S3Settings(" +
@@ -455,8 +475,15 @@ final class S3Settings private (
     s"forwardProxy=$forwardProxy," +
     s"validateObjectKey=$validateObjectKey" +
     s"retrySettings=$retrySettings" +
-    s"multipartUploadSettings=$multipartUploadSettings)" +
-    s"signAnonymousRequests=$signAnonymousRequests"
+    s"multipartUploadSettings=$multipartUploadSettings" +
+    s"signAnonymousRequests=$signAnonymousRequests" +
+    s"allowedHeaders=${
+        val entries = allowedHeaders.toSeq.sortBy(_._1).map { case (key, 
values) =>
+          s"$key -> Set(${values.mkString(", ")})"
+        }.mkString(", ")
+        s"Map($entries)"
+      }" +
+    ")"
 
   override def equals(other: Any): Boolean = other match {
     case that: S3Settings =>
@@ -470,7 +497,8 @@ final class S3Settings private (
       this.validateObjectKey == that.validateObjectKey &&
       Objects.equals(this.retrySettings, that.retrySettings) &&
       Objects.equals(this.multipartUploadSettings, multipartUploadSettings) &&
-      this.signAnonymousRequests == that.signAnonymousRequests
+      this.signAnonymousRequests == that.signAnonymousRequests &&
+      this.allowedHeaders == that.allowedHeaders
     case _ => false
   }
 
@@ -628,6 +656,37 @@ object S3Settings {
 
     val signAnonymousRequests = c.getBoolean("sign-anonymous-requests")
 
+    val allowedHeadersConfig = c.getConfig("allowed-headers")
+
+    val allowedHeadersBase = S3Request.allRequests.map {
+      requestType =>
+        val requestTypeString = requestType.toString()
+        val value = if (allowedHeadersConfig.hasPath(requestTypeString)) {
+          allowedHeadersConfig.getStringList(requestTypeString).asScala.toSet
+        } else {
+          Set.empty[String]
+        }
+        (requestType.toString(), value)
+    }.toMap
+
+    val additionalAllowedHeadersConfig = 
c.getConfig("additional-allowed-headers")
+
+    val additionalAllowedHeaders: Map[String, Set[String]] =
+      S3Request.allRequests.map {
+        requestType =>
+          val requestTypeString = requestType.toString()
+          val value = if 
(additionalAllowedHeadersConfig.hasPath(requestTypeString)) {
+            
additionalAllowedHeadersConfig.getStringList(requestTypeString).asScala.toSet
+          } else {
+            Set.empty[String]
+          }
+          (requestType.toString(), value)
+      }.toMap
+
+    val finalAllowedHeaders = allowedHeadersBase ++ 
additionalAllowedHeaders.map { case (k, v) =>
+      k -> (v ++ allowedHeadersBase.getOrElse(k, Set.empty[String]))
+    }
+
     new S3Settings(
       bufferType,
       credentialsProvider,
@@ -639,7 +698,9 @@ object S3Settings {
       validateObjectKey,
       retrySettings,
       multipartUploadSettings,
-      signAnonymousRequests)
+      signAnonymousRequests,
+      finalAllowedHeaders
+    )
   }
 
   /**
@@ -648,11 +709,27 @@ object S3Settings {
   def create(c: Config): S3Settings = apply(c)
 
   /** Scala API */
+
+  def apply(
+      bufferType: BufferType,
+      credentialsProvider: AwsCredentialsProvider,
+      s3RegionProvider: AwsRegionProvider,
+      listBucketApiVersion: ApiVersion): S3Settings =
+    apply(
+      bufferType,
+      credentialsProvider,
+      s3RegionProvider,
+      listBucketApiVersion,
+      allowedHeaders = Map.empty[String, Set[String]] // default value
+    )
+
   def apply(
       bufferType: BufferType,
       credentialsProvider: AwsCredentialsProvider,
       s3RegionProvider: AwsRegionProvider,
-      listBucketApiVersion: ApiVersion): S3Settings = new S3Settings(
+      listBucketApiVersion: ApiVersion,
+      allowedHeaders: Map[String, Set[String]]
+  ): S3Settings = new S3Settings(
     bufferType,
     credentialsProvider,
     s3RegionProvider,
@@ -663,18 +740,31 @@ object S3Settings {
     validateObjectKey = true,
     RetrySettings.default,
     MultipartUploadSettings(RetrySettings.default),
-    signAnonymousRequests = true)
+    signAnonymousRequests = true,
+    allowedHeaders = allowedHeaders
+  )
 
   /** Java API */
+
   def create(
       bufferType: BufferType,
       credentialsProvider: AwsCredentialsProvider,
       s3RegionProvider: AwsRegionProvider,
-      listBucketApiVersion: ApiVersion): S3Settings = apply(
+      listBucketApiVersion: ApiVersion): S3Settings =
+    create(bufferType, credentialsProvider, s3RegionProvider, 
listBucketApiVersion, Map.empty)
+
+  def create(
+      bufferType: BufferType,
+      credentialsProvider: AwsCredentialsProvider,
+      s3RegionProvider: AwsRegionProvider,
+      listBucketApiVersion: ApiVersion,
+      allowedHeaders: Map[String, Set[String]]): S3Settings = apply(
     bufferType,
     credentialsProvider,
     s3RegionProvider,
-    listBucketApiVersion)
+    listBucketApiVersion,
+    allowedHeaders
+  )
 
   /**
    * Scala API: Creates [[S3Settings]] from the [[com.typesafe.config.Config 
Config]] attached to an actor system.
diff --git a/s3/src/test/scala/docs/scaladsl/S3SinkSpec.scala 
b/s3/src/test/scala/docs/scaladsl/S3SinkSpec.scala
index bfb809b41..de669ee75 100644
--- a/s3/src/test/scala/docs/scaladsl/S3SinkSpec.scala
+++ b/s3/src/test/scala/docs/scaladsl/S3SinkSpec.scala
@@ -240,6 +240,8 @@ class S3SinkSpec extends S3WireMockBase with 
S3ClientIntegrationSpec with Option
 
     val requestPayerHeader = "x-amz-request-payer"
     val requestPayerHeaderValue = "requester"
+    val storageClassHeader = "x-amz-storage-class"
+    val storageClassHeaderValue = "STANDARD_IA"
 
     val keys = ServerSideEncryption
       .customerKeys(sseCustomerKey)
@@ -263,7 +265,13 @@ class S3SinkSpec extends S3WireMockBase with 
S3ClientIntegrationSpec with Option
         targetBucketKey,
         s3Headers = S3Headers()
           .withServerSideEncryption(keys)
-          .withCustomHeaders(Map(requestPayerHeader -> 
requestPayerHeaderValue)))
+          .withCustomHeaders(
+            Map(
+              requestPayerHeader -> requestPayerHeaderValue,
+              storageClassHeader -> storageClassHeaderValue
+            )
+          )
+      )
         .run()
 
     result.futureValue shouldBe MultipartUploadResult(targetUrl, targetBucket, 
targetBucketKey, etag, None)
@@ -286,7 +294,9 @@ class S3SinkSpec extends S3WireMockBase with 
S3ClientIntegrationSpec with Option
         .withHeader(sseCKeyHeader, new EqualToPattern(sseCKeyHeaderValue))
         .withHeader(sseCSourceAlgorithmHeader, new 
EqualToPattern(sseCSourceAlgorithmHeaderValue))
         .withHeader(sseCSourceKeyHeader, new 
EqualToPattern(sseCSourceKeyHeaderValue))
-        .withHeader(requestPayerHeader, new 
EqualToPattern(requestPayerHeaderValue)))
+        .withHeader(requestPayerHeader, new 
EqualToPattern(requestPayerHeaderValue))
+        .withoutHeader(storageClassHeader)
+    )
 
     // SSE headers only
     mock.verifyThat(
diff --git 
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/S3HeadersSpec.scala 
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/S3HeadersSpec.scala
new file mode 100644
index 000000000..a5e7cec31
--- /dev/null
+++ 
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/S3HeadersSpec.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.connectors.s3
+
+import com.typesafe.config.ConfigFactory
+import org.apache.pekko
+import org.scalatest.flatspec.AnyFlatSpecLike
+import org.scalatest.matchers.should.Matchers
+
+import pekko.stream.connectors.s3.impl._
+
+class S3HeadersSpec extends AnyFlatSpecLike with Matchers {
+  it should "filter headers based on what's allowed" in {
+    val testOverrideConfig = ConfigFactory.parseString("""
+      | pekko.connectors.s3 {
+      |  allowed-headers {
+      |    GetObject = [base]
+      |    HeadObject = [base]
+      |    PutObject = [base]
+      |    InitiateMultipartUpload = [base]
+      |    UploadPart = [base]
+      |    CopyPart = [base]
+      |    DeleteObject = [base]
+      |    ListBucket = [base]
+      |    MakeBucket = [base]
+      |    DeleteBucket = [base]
+      |    CheckBucket = [base]
+      |    PutBucketVersioning = [base]
+      |    GetBucketVersioning = [base]
+      | }
+      | additional-allowed-headers {
+      |    GetObject = [allowedExtra]
+      |    HeadObject = [allowedExtra]
+      |    PutObject = [allowedExtra]
+      |    InitiateMultipartUpload = [allowedExtra]
+      |    UploadPart = [allowedExtra]
+      |    CopyPart = [allowedExtra]
+      |    DeleteObject = [allowedExtra]
+      |    ListBucket = [allowedExtra]
+      |    MakeBucket = [allowedExtra]
+      |    DeleteBucket = [allowedExtra]
+      |    CheckBucket = [allowedExtra]
+      |    PutBucketVersioning = [allowedExtra]
+      |    GetBucketVersioning = [allowedExtra]
+      | }
+      |}
+      |""".stripMargin)
+
+    val defaultConfig = ConfigFactory.load()
+    val finalConfig = testOverrideConfig.withFallback(defaultConfig)
+
+    S3Request.allRequests.foreach {
+      requestType =>
+        val extraHeaders = Map("allowedExtra" -> "allGood", "notAllowed" -> 
"shouldBeGone")
+        val header = S3Headers().withCustomHeaders(Map("base" -> 
requestType.toString()) ++ extraHeaders)
+        val s3Config = finalConfig.getConfig("pekko.connectors.s3")
+        val headerFilter = 
header.headersFor(requestType)(S3Settings.apply(s3Config))
+        val result = headerFilter.map(header => (header.name(), 
header.value()))
+        result should contain allElementsOf (Seq("base" -> 
requestType.toString) ++ Seq("allowedExtra" -> "allGood"))
+    }
+
+  }
+
+  it should "be able to convert all headers toString and back correctly" in {
+    val roundTrip = S3Request.allRequests
+      .map(_.toString())
+      .flatMap(S3Request.fromString(_).toOption)
+
+    roundTrip should contain allElementsOf S3Request.allRequests
+
+  }
+
+  it should "contain all S3Request types" in {
+    val actualSet = S3Request.allRequests
+
+    actualSet should have size 13
+
+    // Use exhaustive match to verify all are present
+    def verifyAllPresent(req: S3Request): Boolean = req match {
+      case e @ GetObject               => actualSet.contains(e)
+      case e @ HeadObject              => actualSet.contains(e)
+      case e @ PutObject               => actualSet.contains(e)
+      case e @ InitiateMultipartUpload => actualSet.contains(e)
+      case e @ UploadPart              => actualSet.contains(e)
+      case e @ CopyPart                => actualSet.contains(e)
+      case e @ DeleteObject            => actualSet.contains(e)
+      case e @ ListBucket              => actualSet.contains(e)
+      case e @ MakeBucket              => actualSet.contains(e)
+      case e @ DeleteBucket            => actualSet.contains(e)
+      case e @ CheckBucket             => actualSet.contains(e)
+      case e @ PutBucketVersioning     => actualSet.contains(e)
+      case e @ GetBucketVersioning     => actualSet.contains(e)
+    }
+
+    actualSet.foreach(req => verifyAllPresent(req) shouldBe true)
+
+  }
+}
diff --git 
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/S3SettingsSpec.scala 
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/S3SettingsSpec.scala
index cfdb66f5e..9dd1bd6bb 100644
--- 
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/S3SettingsSpec.scala
+++ 
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/S3SettingsSpec.scala
@@ -41,6 +41,36 @@ class S3SettingsSpec extends S3WireMockBase with 
S3ClientIntegrationSpec with Op
           |}
           |multipart-upload.retry-settings = $${retry-settings}
           |sign-anonymous-requests = true
+          |allowed-headers {
+          |    GetObject = [GetObject0]
+          |    HeadObject = [HeadObject0]
+          |    PutObject = [PutObject0]
+          |    InitiateMultipartUpload = [InitiateMultipartUpload0]
+          |    UploadPart = [UploadPart0]
+          |    CopyPart = [CopyPart0]
+          |    DeleteObject = [DeleteObject0]
+          |    ListBucket = [ListBucket0]
+          |    MakeBucket = [MakeBucket0]
+          |    DeleteBucket = [DeleteBucket0]
+          |    CheckBucket = [CheckBucket0]
+          |    PutBucketVersioning = [PutBucketVersioning0]
+          |    GetBucketVersioning = [GetBucketVersioning0]
+          |}
+          |additional-allowed-headers {
+          |    GetObject = []
+          |    HeadObject = []
+          |    PutObject = []
+          |    InitiateMultipartUpload = []
+          |    UploadPart = []
+          |    CopyPart = []
+          |    DeleteObject = []
+          |    ListBucket = []
+          |    MakeBucket = []
+          |    DeleteBucket = []
+          |    CheckBucket = []
+          |    PutBucketVersioning = []
+          |    GetBucketVersioning = []
+          |}
           |$more
         """.stripMargin)
         .resolve)
@@ -273,4 +303,33 @@ class S3SettingsSpec extends S3WireMockBase with 
S3ClientIntegrationSpec with Op
     val settings = mkSettings("sign-anonymous-requests = false")
     settings.signAnonymousRequests shouldBe false
   }
+
+  it should "parse additional-allowed-headers" in {
+    val settings = mkSettings("""
+          |additional-allowed-headers {
+          |    GetObject = [GetObject1, GetObject2]
+          |    HeadObject = [HeadObject1, HeadObject2]
+          |    PutObject = [PutObject1, PutObject2]
+          |    InitiateMultipartUpload = [InitiateMultipartUpload1, 
InitiateMultipartUpload2 ]
+          |    UploadPart = [UploadPart1, UploadPart2]
+          |    CopyPart = [CopyPart1, CopyPart2]
+          |    DeleteObject = [DeleteObject1, DeleteObject2]
+          |    ListBucket = [ListBucket1, ListBucket2]
+          |    MakeBucket = [MakeBucket1, MakeBucket2]
+          |    DeleteBucket = [DeleteBucket1, DeleteBucket2]
+          |    CheckBucket = [CheckBucket1, CheckBucket2]
+          |    PutBucketVersioning = [PutBucketVersioning1, 
PutBucketVersioning2]
+          |    GetBucketVersioning = [GetBucketVersioning1, 
GetBucketVersioning2]
+          |}""".stripMargin)
+
+    settings.allowedHeaders.keySet should contain allElementsOf 
(pekko.stream.connectors.s3.impl.S3Request.allRequests.map(
+      _.toString()))
+    settings.allowedHeaders.foreach {
+      case (key, value) =>
+        assert(value.nonEmpty)
+        val result = value.map(_.replaceAll(key, "").toInt)
+        result should contain allElementsOf (Seq(0, 1, 2))
+    }
+  }
+
 }
diff --git 
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/HttpRequestsSpec.scala
 
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/HttpRequestsSpec.scala
index f5d851291..e901e2764 100644
--- 
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/HttpRequestsSpec.scala
+++ 
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/HttpRequestsSpec.scala
@@ -46,7 +46,7 @@ class HttpRequestsSpec extends AnyFlatSpec with Matchers with 
ScalaFutures with
       def getRegion = s3Region
     }
 
-    S3Settings(bufferType, awsCredentials, regionProvider, 
listBucketApiVersion)
+    S3Settings(bufferType, awsCredentials, regionProvider, 
listBucketApiVersion, Map.empty)
   }
 
   val location = S3Location("bucket", "image-1024@2x")
diff --git 
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/S3StreamSpec.scala
 
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/S3StreamSpec.scala
index 5c24dbc7f..5801061fc 100644
--- 
a/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/S3StreamSpec.scala
+++ 
b/s3/src/test/scala/org/apache/pekko/stream/connectors/s3/impl/S3StreamSpec.scala
@@ -63,7 +63,7 @@ class S3StreamSpec(_system: ActorSystem)
     val location = S3Location("test-bucket", "test-key")
 
     implicit val settings: S3Settings =
-      S3Settings(MemoryBufferType, credentialsProvider, regionProvider, 
ApiVersion.ListBucketVersion2)
+      S3Settings(MemoryBufferType, credentialsProvider, regionProvider, 
ApiVersion.ListBucketVersion2, Map.empty)
 
     val result: HttpRequest = 
S3Stream.invokePrivate(requestHeaders(getDownloadRequest(location), None))
     result.headers.size shouldBe 2
@@ -87,7 +87,7 @@ class S3StreamSpec(_system: ActorSystem)
     val range = ByteRange(1, 4)
 
     implicit val settings: S3Settings =
-      S3Settings(MemoryBufferType, credentialsProvider, regionProvider, 
ApiVersion.ListBucketVersion2)
+      S3Settings(MemoryBufferType, credentialsProvider, regionProvider, 
ApiVersion.ListBucketVersion2, Map.empty)
 
     val result: HttpRequest = 
S3Stream.invokePrivate(requestHeaders(getDownloadRequest(location), 
Some(range)))
     result.headers.size shouldBe 3


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to